Commit ecf1031d authored by Vitaliy Filippov's avatar Vitaliy Filippov
Browse files

Implement mounting via stage directory

Previously, multiple containers with the same mounted volume resulted in multiple
FUSE processes. This behaviour was breaking parallel modifications from different
containers, consumed extra resources, and after mounting via systemd was introduced,
led to the total inability to mount the same volume into multiple containers on
the same host.

Now only one FUSE process is started per volume, per host.
parent 1305b20b
Loading
Loading
Loading
Loading
+7 −0
Original line number Diff line number Diff line
@@ -103,6 +103,9 @@ spec:
          volumeMounts:
            - name: plugin-dir
              mountPath: /csi
            - name: stage-dir
              mountPath: /var/lib/kubelet/plugins/kubernetes.io/csi/ru.yandex.s3.csi
              mountPropagation: "Bidirectional"
            - name: pods-mount-dir
              mountPath: /var/lib/kubelet/pods
              mountPropagation: "Bidirectional"
@@ -119,6 +122,10 @@ spec:
          hostPath:
            path: /var/lib/kubelet/plugins/ru.yandex.s3.csi
            type: DirectoryOrCreate
        - name: stage-dir
          hostPath:
            path: /var/lib/kubelet/plugins/kubernetes.io/csi/ru.yandex.s3.csi
            type: DirectoryOrCreate
        - name: pods-mount-dir
          hostPath:
            path: /var/lib/kubelet/pods
+1 −1
Original line number Diff line number Diff line
@@ -33,7 +33,7 @@ type driver struct {
}

var (
	vendorVersion = "v1.2.0"
	vendorVersion = "v1.34.6"
	driverName    = "ru.yandex.s3.csi"
)

+24 −14
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@ package driver
import (
	"fmt"
	"os"
	"os/exec"
	"regexp"
	"strconv"

@@ -68,7 +69,6 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
	volumeID := req.GetVolumeId()
	targetPath := req.GetTargetPath()
	stagingTargetPath := req.GetStagingTargetPath()
	bucketName, prefix := volumeIDToBucketPrefix(volumeID)

	// Check arguments
	if req.GetVolumeCapability() == nil {
@@ -100,18 +100,12 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
	glog.V(4).Infof("target %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n",
		targetPath, readOnly, volumeID, attrib, mountFlags)

	s3, err := s3.NewClientFromSecret(req.GetSecrets())
	cmd := exec.Command("mount", "--bind", stagingTargetPath, targetPath)
	cmd.Stderr = os.Stderr
	glog.V(3).Infof("Binding volume %v from %v to %v", volumeID, stagingTargetPath, targetPath)
	out, err := cmd.Output()
	if err != nil {
		return nil, fmt.Errorf("failed to initialize S3 client: %s", err)
	}

	meta := getMeta(bucketName, prefix, req.VolumeContext)
	mounter, err := mounter.New(meta, s3.Config)
	if err != nil {
		return nil, err
	}
	if err := mounter.Mount(stagingTargetPath, targetPath, volumeID); err != nil {
		return nil, err
		return nil, fmt.Errorf("Error running mount --bind %v %v: %s", stagingTargetPath, targetPath, out)
	}

	glog.V(4).Infof("s3: volume %s successfully mounted to %s", volumeID, targetPath)
@@ -131,7 +125,7 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
		return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
	}

	if err := mounter.FuseUnmount(targetPath); err != nil {
	if err := mounter.Unmount(targetPath); err != nil {
		return nil, status.Error(codes.Internal, err.Error())
	}
	glog.V(4).Infof("s3: volume %s has been unmounted.", volumeID)
@@ -174,7 +168,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
	if err != nil {
		return nil, err
	}
	if err := mounter.Stage(stagingTargetPath); err != nil {
	if err := mounter.Mount(stagingTargetPath, volumeID); err != nil {
		return nil, err
	}

@@ -193,6 +187,22 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
		return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
	}

	proc, err := mounter.FindFuseMountProcess(stagingTargetPath)
	if err != nil {
		return nil, err
	}
	exists := false
	if proc == nil {
		exists, err = mounter.SystemdUnmount(volumeID)
		if exists && err != nil {
			return nil, err
		}
	}
	if !exists {
		err = mounter.FuseUnmount(stagingTargetPath)
	}
	glog.V(4).Infof("s3: volume %s has been unmounted from stage path %v.", volumeID, stagingTargetPath)

	return &csi.NodeUnstageVolumeResponse{}, nil
}

+32 −18
Original line number Diff line number Diff line
@@ -3,11 +3,11 @@ package mounter
import (
	"fmt"
	"os"
	"strings"
	"time"

	systemd "github.com/coreos/go-systemd/v22/dbus"
	dbus "github.com/godbus/dbus/v5"
	"github.com/golang/glog"

	"github.com/yandex-cloud/k8s-csi-s3/pkg/s3"
)
@@ -35,14 +35,6 @@ func newGeeseFSMounter(meta *s3.FSMeta, cfg *s3.Config) (Mounter, error) {
	}, nil
}

func (geesefs *geesefsMounter) Stage(stageTarget string) error {
	return nil
}

func (geesefs *geesefsMounter) Unstage(stageTarget string) error {
	return nil
}

func (geesefs *geesefsMounter) CopyBinary(from, to string) error {
	st, err := os.Stat(from)
	if err != nil {
@@ -87,7 +79,7 @@ func (geesefs *geesefsMounter) MountDirect(target string, args []string) error {
	return fuseMount(target, geesefsCmd, args)
}

func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
func (geesefs *geesefsMounter) Mount(target, volumeID string) error {
	fullPath := fmt.Sprintf("%s:%s", geesefs.meta.BucketName, geesefs.meta.Prefix)
	var args []string
	if geesefs.region != "" {
@@ -113,7 +105,7 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
	}
	conn, err := systemd.New()
	if err != nil {
		fmt.Printf("Failed to connect to systemd dbus service: %v, starting geesefs directly\n", err)
		glog.Errorf("Failed to connect to systemd dbus service: %v, starting geesefs directly", err)
		return geesefs.MountDirect(target, args)
	}
	defer conn.Close()
@@ -127,7 +119,7 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
	}
	args = append([]string{pluginDir+"/geesefs", "-f", "-o", "allow_other", "--endpoint", geesefs.endpoint}, args...)
	unitName := "geesefs-"+systemd.PathBusEscape(volumeID)+".service"
	props := []systemd.Property{
	newProps := []systemd.Property{
		systemd.Property{
			Name: "Description",
			Value: dbus.MakeVariant("GeeseFS mount for Kubernetes volume "+volumeID),
@@ -142,13 +134,35 @@ func (geesefs *geesefsMounter) Mount(source, target, volumeID string) error {
			Value: dbus.MakeVariant("inactive-or-failed"),
		},
	}
	_, err = conn.StartTransientUnit(unitName, "replace", props, nil)
	if err != nil && strings.Contains(err.Error(), "already exists") {
	unitProps, err := conn.GetAllProperties(unitName)
	if err == nil {
		// Unit already exists
		if s, ok := unitProps["ActiveState"].(string); ok && (s == "active" || s == "activating" || s == "reloading") {
			// Unit is already active
			curPath := ""
			prevExec, ok := unitProps["ExecStart"].([][]interface{})
			if ok && len(prevExec) > 0 && len(prevExec[0]) >= 2 {
				execArgs, ok := prevExec[0][1].([]string)
				if ok && len(execArgs) >= 2 {
					curPath = execArgs[len(execArgs)-1]
				}
			}
			if curPath != target {
				return fmt.Errorf(
					"GeeseFS for volume %v is already mounted on host, but"+
					" in a different directory. We want %v, but it's in %v",
					volumeID, target, curPath,
				)
			}
			// Already mounted at right location
			return nil
		} else {
			// Stop and garbage collect the unit if automatic collection didn't work for some reason
			conn.StopUnit(unitName, "replace", nil)
			conn.ResetFailedUnit(unitName)
		_, err = conn.StartTransientUnit(unitName, "replace", props, nil)
		}
	}
	_, err = conn.StartTransientUnit(unitName, "replace", newProps, nil)
	if err != nil {
		return fmt.Errorf("Error starting systemd unit %s on host: %v", unitName, err)
	}
+34 −6
Original line number Diff line number Diff line
@@ -11,18 +11,18 @@ import (
	"syscall"
	"time"

	"github.com/yandex-cloud/k8s-csi-s3/pkg/s3"
	systemd "github.com/coreos/go-systemd/v22/dbus"
	"github.com/golang/glog"
	"github.com/mitchellh/go-ps"
	"k8s.io/kubernetes/pkg/util/mount"

	"github.com/yandex-cloud/k8s-csi-s3/pkg/s3"
)

// Mounter interface which can be implemented
// by the different mounter types
type Mounter interface {
	Stage(stagePath string) error
	Unstage(stagePath string) error
	Mount(source, target, volumeID string) error
	Mount(target, volumeID string) error
}

const (
@@ -70,12 +70,40 @@ func fuseMount(path string, command string, args []string) error {
	return waitForMount(path, 10*time.Second)
}

func Unmount(path string) error {
	if err := mount.New("").Unmount(path); err != nil {
		return err
	}
	return nil
}

func SystemdUnmount(volumeID string) (bool, error) {
	conn, err := systemd.New()
	if err != nil {
		glog.Errorf("Failed to connect to systemd dbus service: %v", err)
		return false, err
	}
	defer conn.Close()
	unitName := "geesefs-"+systemd.PathBusEscape(volumeID)+".service"
	units, err := conn.ListUnitsByNames([]string{ unitName })
	glog.Errorf("Got %v", units)
	if err != nil {
		glog.Errorf("Failed to list systemd unit by name %v: %v", unitName, err)
		return false, err
	}
	if len(units) == 0 || units[0].ActiveState == "inactive" || units[0].ActiveState == "failed" {
		return true, nil
	}
	_, err = conn.StopUnit(unitName, "replace", nil)
	return true, err
}

func FuseUnmount(path string) error {
	if err := mount.New("").Unmount(path); err != nil {
		return err
	}
	// as fuse quits immediately, we will try to wait until the process is done
	process, err := findFuseMountProcess(path)
	process, err := FindFuseMountProcess(path)
	if err != nil {
		glog.Errorf("Error getting PID of fuse mount: %s", err)
		return nil
@@ -107,7 +135,7 @@ func waitForMount(path string, timeout time.Duration) error {
	}
}

func findFuseMountProcess(path string) (*os.Process, error) {
func FindFuseMountProcess(path string) (*os.Process, error) {
	processes, err := ps.Processes()
	if err != nil {
		return nil, err
Loading