Loading pkg/driver/controllerserver.go +10 −57 Original line number Diff line number Diff line Loading @@ -22,7 +22,6 @@ import ( "fmt" "io" "path" "regexp" "strings" "github.com/ctrox/csi-s3/pkg/mounter" Loading @@ -43,24 +42,9 @@ type controllerServer struct { func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { params := req.GetParameters() capacityBytes := int64(req.GetCapacityRange().GetRequiredBytes()) mounterType := params[mounter.TypeKey] volumeID := sanitizeVolumeID(req.GetName()) bucketName := volumeID prefix := "" mountOptions := make([]string, 0) mountOptStr := params[mounter.OptionsKey] if mountOptStr != "" { re, _ := regexp.Compile(`([^\s"]+|"([^"\\]+|\\")*")+`) re2, _ := regexp.Compile(`"([^"\\]+|\\")*"`) re3, _ := regexp.Compile(`\\(.)`) for _, opt := range re.FindAll([]byte(mountOptStr), -1) { // Unquote options opt = re2.ReplaceAllFunc(opt, func(q []byte) []byte { return re3.ReplaceAll(q[1 : len(q)-1], []byte("$1")) }) mountOptions = append(mountOptions, string(opt)) } } // check if bucket name is overridden if nameOverride, ok := params[mounter.BucketKey]; ok { Loading @@ -84,14 +68,6 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol glog.V(4).Infof("Got a request to create volume %s", volumeID) meta := &s3.FSMeta{ BucketName: bucketName, Prefix: prefix, Mounter: mounterType, MountOptions: mountOptions, CapacityBytes: capacityBytes, } client, err := s3.NewClientFromSecret(req.GetSecrets()) if err != nil { return nil, fmt.Errorf("failed to initialize S3 client: %s", err) Loading @@ -102,18 +78,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, fmt.Errorf("failed to check if bucket %s exists: %v", volumeID, err) } if exists { // get meta, ignore errors as it could just mean meta does not exist yet m, err := client.GetFSMeta(bucketName, prefix) if err == nil { // Check if volume capacity requested is bigger than the already existing capacity if capacityBytes > m.CapacityBytes { return nil, status.Error( codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID), ) } } } else { if !exists { if err = client.CreateBucket(bucketName); err != nil { return nil, fmt.Errorf("failed to create bucket %s: %v", bucketName, err) } Loading @@ -123,16 +88,19 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, fmt.Errorf("failed to create prefix %s: %v", prefix, err) } if err := client.SetFSMeta(meta); err != nil { return nil, fmt.Errorf("error setting bucket metadata: %w", err) } glog.V(4).Infof("create volume %s", volumeID) // DeleteVolume lacks VolumeContext, but publish&unpublish requests have it, // so we don't need to store additional metadata anywhere context := make(map[string]string) for k, v := range params { context[k] = v } context["capacity"] = fmt.Sprintf("%v", capacityBytes) return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ VolumeId: volumeID, CapacityBytes: capacityBytes, VolumeContext: req.GetParameters(), VolumeContext: context, }, }, nil } Loading @@ -140,7 +108,6 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { volumeID := req.GetVolumeId() bucketName, prefix := volumeIDToBucketPrefix(volumeID) var meta *s3.FSMeta // Check arguments if len(volumeID) == 0 { Loading @@ -158,11 +125,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return nil, fmt.Errorf("failed to initialize S3 client: %s", err) } if meta, err = client.GetFSMeta(bucketName, prefix); err != nil { glog.V(5).Infof("FSMeta of volume %s does not exist, ignoring delete request", volumeID) return &csi.DeleteVolumeResponse{}, nil } var deleteErr error if prefix == "" { // prefix is empty, we delete the whole bucket Loading @@ -178,10 +140,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } if deleteErr != nil { glog.Warning("remove volume failed, will ensure fsmeta exists to avoid losing control over volume") if err := client.SetFSMeta(meta); err != nil { glog.Error(err) } return nil, deleteErr } Loading @@ -196,7 +154,7 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req if req.GetVolumeCapabilities() == nil { return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request") } bucketName, prefix := volumeIDToBucketPrefix(req.GetVolumeId()) bucketName, _ := volumeIDToBucketPrefix(req.GetVolumeId()) client, err := s3.NewClientFromSecret(req.GetSecrets()) if err != nil { Loading @@ -212,11 +170,6 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req return nil, status.Error(codes.NotFound, fmt.Sprintf("bucket of volume with id %s does not exist", req.GetVolumeId())) } if _, err := client.GetFSMeta(bucketName, prefix); err != nil { // return an error if the fsmeta of the requested volume does not exist return nil, status.Error(codes.NotFound, fmt.Sprintf("fsmeta of volume with id %s does not exist", req.GetVolumeId())) } // We currently only support RWO supportedAccessMode := &csi.VolumeCapability_AccessMode{ Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, Loading pkg/driver/nodeserver.go +32 −15 Original line number Diff line number Diff line Loading @@ -19,6 +19,8 @@ package driver import ( "fmt" "os" "regexp" "strconv" "github.com/ctrox/csi-s3/pkg/mounter" "github.com/ctrox/csi-s3/pkg/s3" Loading @@ -37,6 +39,31 @@ type nodeServer struct { *csicommon.DefaultNodeServer } func getMeta(bucketName, prefix string, context map[string]string) *s3.FSMeta { mountOptions := make([]string, 0) mountOptStr := context[mounter.OptionsKey] if mountOptStr != "" { re, _ := regexp.Compile(`([^\s"]+|"([^"\\]+|\\")*")+`) re2, _ := regexp.Compile(`"([^"\\]+|\\")*"`) re3, _ := regexp.Compile(`\\(.)`) for _, opt := range re.FindAll([]byte(mountOptStr), -1) { // Unquote options opt = re2.ReplaceAllFunc(opt, func(q []byte) []byte { return re3.ReplaceAll(q[1 : len(q)-1], []byte("$1")) }) mountOptions = append(mountOptions, string(opt)) } } capacity, _ := strconv.ParseInt(context["capacity"], 10, 64) return &s3.FSMeta{ BucketName: bucketName, Prefix: prefix, Mounter: context[mounter.TypeKey], MountOptions: mountOptions, CapacityBytes: capacity, } } func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { volumeID := req.GetVolumeId() targetPath := req.GetTargetPath() Loading Loading @@ -65,29 +92,21 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis return &csi.NodePublishVolumeResponse{}, nil } deviceID := "" if req.GetPublishContext() != nil { deviceID = req.GetPublishContext()[deviceID] } // TODO: Implement readOnly & mountFlags readOnly := req.GetReadonly() // TODO: check if attrib is correct with context. attrib := req.GetVolumeContext() mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags() glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n", targetPath, deviceID, readOnly, volumeID, attrib, mountFlags) glog.V(4).Infof("target %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n", targetPath, readOnly, volumeID, attrib, mountFlags) s3, err := s3.NewClientFromSecret(req.GetSecrets()) if err != nil { return nil, fmt.Errorf("failed to initialize S3 client: %s", err) } meta, err := s3.GetFSMeta(bucketName, prefix) if err != nil { return nil, err } meta := getMeta(bucketName, prefix, req.VolumeContext) mounter, err := mounter.New(meta, s3.Config) if err != nil { return nil, err Loading Loading @@ -150,10 +169,8 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol if err != nil { return nil, fmt.Errorf("failed to initialize S3 client: %s", err) } meta, err := client.GetFSMeta(bucketName, prefix) if err != nil { return nil, err } meta := getMeta(bucketName, prefix, req.VolumeContext) mounter, err := mounter.New(meta, client.Config) if err != nil { return nil, err Loading pkg/s3/client.go +0 −34 Original line number Diff line number Diff line Loading @@ -3,11 +3,8 @@ package s3 import ( "bytes" "context" "encoding/json" "fmt" "io" "net/url" "path" "github.com/golang/glog" "github.com/minio/minio-go/v7" Loading Loading @@ -221,34 +218,3 @@ func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error { return nil } func (client *s3Client) SetFSMeta(meta *FSMeta) error { b := new(bytes.Buffer) json.NewEncoder(b).Encode(meta) opts := minio.PutObjectOptions{ContentType: "application/json"} _, err := client.minio.PutObject( client.ctx, meta.BucketName, path.Join(meta.Prefix, metadataName), b, int64(b.Len()), opts, ) return err } func (client *s3Client) GetFSMeta(bucketName, prefix string) (*FSMeta, error) { opts := minio.GetObjectOptions{} obj, err := client.minio.GetObject(client.ctx, bucketName, path.Join(prefix, metadataName), opts) if err != nil { return &FSMeta{}, err } objInfo, err := obj.Stat() if err != nil { return &FSMeta{}, err } b := make([]byte, objInfo.Size) _, err = obj.Read(b) if err != nil && err != io.EOF { return &FSMeta{}, err } var meta FSMeta err = json.Unmarshal(b, &meta) return &meta, err } Loading
pkg/driver/controllerserver.go +10 −57 Original line number Diff line number Diff line Loading @@ -22,7 +22,6 @@ import ( "fmt" "io" "path" "regexp" "strings" "github.com/ctrox/csi-s3/pkg/mounter" Loading @@ -43,24 +42,9 @@ type controllerServer struct { func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { params := req.GetParameters() capacityBytes := int64(req.GetCapacityRange().GetRequiredBytes()) mounterType := params[mounter.TypeKey] volumeID := sanitizeVolumeID(req.GetName()) bucketName := volumeID prefix := "" mountOptions := make([]string, 0) mountOptStr := params[mounter.OptionsKey] if mountOptStr != "" { re, _ := regexp.Compile(`([^\s"]+|"([^"\\]+|\\")*")+`) re2, _ := regexp.Compile(`"([^"\\]+|\\")*"`) re3, _ := regexp.Compile(`\\(.)`) for _, opt := range re.FindAll([]byte(mountOptStr), -1) { // Unquote options opt = re2.ReplaceAllFunc(opt, func(q []byte) []byte { return re3.ReplaceAll(q[1 : len(q)-1], []byte("$1")) }) mountOptions = append(mountOptions, string(opt)) } } // check if bucket name is overridden if nameOverride, ok := params[mounter.BucketKey]; ok { Loading @@ -84,14 +68,6 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol glog.V(4).Infof("Got a request to create volume %s", volumeID) meta := &s3.FSMeta{ BucketName: bucketName, Prefix: prefix, Mounter: mounterType, MountOptions: mountOptions, CapacityBytes: capacityBytes, } client, err := s3.NewClientFromSecret(req.GetSecrets()) if err != nil { return nil, fmt.Errorf("failed to initialize S3 client: %s", err) Loading @@ -102,18 +78,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, fmt.Errorf("failed to check if bucket %s exists: %v", volumeID, err) } if exists { // get meta, ignore errors as it could just mean meta does not exist yet m, err := client.GetFSMeta(bucketName, prefix) if err == nil { // Check if volume capacity requested is bigger than the already existing capacity if capacityBytes > m.CapacityBytes { return nil, status.Error( codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with smaller size already exist", volumeID), ) } } } else { if !exists { if err = client.CreateBucket(bucketName); err != nil { return nil, fmt.Errorf("failed to create bucket %s: %v", bucketName, err) } Loading @@ -123,16 +88,19 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, fmt.Errorf("failed to create prefix %s: %v", prefix, err) } if err := client.SetFSMeta(meta); err != nil { return nil, fmt.Errorf("error setting bucket metadata: %w", err) } glog.V(4).Infof("create volume %s", volumeID) // DeleteVolume lacks VolumeContext, but publish&unpublish requests have it, // so we don't need to store additional metadata anywhere context := make(map[string]string) for k, v := range params { context[k] = v } context["capacity"] = fmt.Sprintf("%v", capacityBytes) return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ VolumeId: volumeID, CapacityBytes: capacityBytes, VolumeContext: req.GetParameters(), VolumeContext: context, }, }, nil } Loading @@ -140,7 +108,6 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { volumeID := req.GetVolumeId() bucketName, prefix := volumeIDToBucketPrefix(volumeID) var meta *s3.FSMeta // Check arguments if len(volumeID) == 0 { Loading @@ -158,11 +125,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return nil, fmt.Errorf("failed to initialize S3 client: %s", err) } if meta, err = client.GetFSMeta(bucketName, prefix); err != nil { glog.V(5).Infof("FSMeta of volume %s does not exist, ignoring delete request", volumeID) return &csi.DeleteVolumeResponse{}, nil } var deleteErr error if prefix == "" { // prefix is empty, we delete the whole bucket Loading @@ -178,10 +140,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } if deleteErr != nil { glog.Warning("remove volume failed, will ensure fsmeta exists to avoid losing control over volume") if err := client.SetFSMeta(meta); err != nil { glog.Error(err) } return nil, deleteErr } Loading @@ -196,7 +154,7 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req if req.GetVolumeCapabilities() == nil { return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request") } bucketName, prefix := volumeIDToBucketPrefix(req.GetVolumeId()) bucketName, _ := volumeIDToBucketPrefix(req.GetVolumeId()) client, err := s3.NewClientFromSecret(req.GetSecrets()) if err != nil { Loading @@ -212,11 +170,6 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req return nil, status.Error(codes.NotFound, fmt.Sprintf("bucket of volume with id %s does not exist", req.GetVolumeId())) } if _, err := client.GetFSMeta(bucketName, prefix); err != nil { // return an error if the fsmeta of the requested volume does not exist return nil, status.Error(codes.NotFound, fmt.Sprintf("fsmeta of volume with id %s does not exist", req.GetVolumeId())) } // We currently only support RWO supportedAccessMode := &csi.VolumeCapability_AccessMode{ Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, Loading
pkg/driver/nodeserver.go +32 −15 Original line number Diff line number Diff line Loading @@ -19,6 +19,8 @@ package driver import ( "fmt" "os" "regexp" "strconv" "github.com/ctrox/csi-s3/pkg/mounter" "github.com/ctrox/csi-s3/pkg/s3" Loading @@ -37,6 +39,31 @@ type nodeServer struct { *csicommon.DefaultNodeServer } func getMeta(bucketName, prefix string, context map[string]string) *s3.FSMeta { mountOptions := make([]string, 0) mountOptStr := context[mounter.OptionsKey] if mountOptStr != "" { re, _ := regexp.Compile(`([^\s"]+|"([^"\\]+|\\")*")+`) re2, _ := regexp.Compile(`"([^"\\]+|\\")*"`) re3, _ := regexp.Compile(`\\(.)`) for _, opt := range re.FindAll([]byte(mountOptStr), -1) { // Unquote options opt = re2.ReplaceAllFunc(opt, func(q []byte) []byte { return re3.ReplaceAll(q[1 : len(q)-1], []byte("$1")) }) mountOptions = append(mountOptions, string(opt)) } } capacity, _ := strconv.ParseInt(context["capacity"], 10, 64) return &s3.FSMeta{ BucketName: bucketName, Prefix: prefix, Mounter: context[mounter.TypeKey], MountOptions: mountOptions, CapacityBytes: capacity, } } func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { volumeID := req.GetVolumeId() targetPath := req.GetTargetPath() Loading Loading @@ -65,29 +92,21 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis return &csi.NodePublishVolumeResponse{}, nil } deviceID := "" if req.GetPublishContext() != nil { deviceID = req.GetPublishContext()[deviceID] } // TODO: Implement readOnly & mountFlags readOnly := req.GetReadonly() // TODO: check if attrib is correct with context. attrib := req.GetVolumeContext() mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags() glog.V(4).Infof("target %v\ndevice %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n", targetPath, deviceID, readOnly, volumeID, attrib, mountFlags) glog.V(4).Infof("target %v\nreadonly %v\nvolumeId %v\nattributes %v\nmountflags %v\n", targetPath, readOnly, volumeID, attrib, mountFlags) s3, err := s3.NewClientFromSecret(req.GetSecrets()) if err != nil { return nil, fmt.Errorf("failed to initialize S3 client: %s", err) } meta, err := s3.GetFSMeta(bucketName, prefix) if err != nil { return nil, err } meta := getMeta(bucketName, prefix, req.VolumeContext) mounter, err := mounter.New(meta, s3.Config) if err != nil { return nil, err Loading Loading @@ -150,10 +169,8 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol if err != nil { return nil, fmt.Errorf("failed to initialize S3 client: %s", err) } meta, err := client.GetFSMeta(bucketName, prefix) if err != nil { return nil, err } meta := getMeta(bucketName, prefix, req.VolumeContext) mounter, err := mounter.New(meta, client.Config) if err != nil { return nil, err Loading
pkg/s3/client.go +0 −34 Original line number Diff line number Diff line Loading @@ -3,11 +3,8 @@ package s3 import ( "bytes" "context" "encoding/json" "fmt" "io" "net/url" "path" "github.com/golang/glog" "github.com/minio/minio-go/v7" Loading Loading @@ -221,34 +218,3 @@ func (client *s3Client) removeObjectsOneByOne(bucketName, prefix string) error { return nil } func (client *s3Client) SetFSMeta(meta *FSMeta) error { b := new(bytes.Buffer) json.NewEncoder(b).Encode(meta) opts := minio.PutObjectOptions{ContentType: "application/json"} _, err := client.minio.PutObject( client.ctx, meta.BucketName, path.Join(meta.Prefix, metadataName), b, int64(b.Len()), opts, ) return err } func (client *s3Client) GetFSMeta(bucketName, prefix string) (*FSMeta, error) { opts := minio.GetObjectOptions{} obj, err := client.minio.GetObject(client.ctx, bucketName, path.Join(prefix, metadataName), opts) if err != nil { return &FSMeta{}, err } objInfo, err := obj.Stat() if err != nil { return &FSMeta{}, err } b := make([]byte, objInfo.Size) _, err = obj.Read(b) if err != nil && err != io.EOF { return &FSMeta{}, err } var meta FSMeta err = json.Unmarshal(b, &meta) return &meta, err }