package driver import ( "context" "crypto/sha1" "encoding/hex" "fmt" "io" "strings" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) type ControllerServer struct { Driver *SeaweedFsDriver } var _ = csi.ControllerServer(&ControllerServer{}) func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { volumeId := sanitizeVolumeId(req.GetName()) if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { glog.V(3).Infof("invalid create volume req: %v", req) return nil, err } // Check arguments if volumeId == "" { return nil, status.Error(codes.InvalidArgument, "Name missing in request") } if req.GetVolumeCapabilities() == nil { return nil, status.Error(codes.InvalidArgument, "Volume Capabilities missing in request") } params := req.GetParameters() glog.V(4).Info("params:%v", params) capacity := req.GetCapacityRange().GetRequiredBytes() capacityGB := capacity >> 30 if capacityGB == 0 { return nil, status.Error(codes.InvalidArgument, "required bytes less than 1GB") } seaweedFsVolumeCount := capacityGB / 30 if seaweedFsVolumeCount == 0 { seaweedFsVolumeCount = 1 } if err := filer_pb.Mkdir(cs.Driver, "/buckets", volumeId, nil); err != nil { return nil, fmt.Errorf("Error setting bucket metadata: %v", err) } glog.V(4).Infof("create volume %s", volumeId) return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ VolumeId: volumeId, CapacityBytes: capacity, // 0, // seaweedFsVolumeCount * 1024 * 1024 * 30, VolumeContext: params, }, }, nil } func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { volumeId := req.VolumeId // Check arguments if volumeId == "" { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { glog.V(3).Infof("Invalid delete volume req: %v", req) return nil, err } glog.V(4).Infof("Deleting volume %s", volumeId) if err := filer_pb.Remove(cs.Driver, "/buckets", volumeId, true, true, true, false, nil); err != nil { return nil, fmt.Errorf("Error setting bucket metadata: %v", err) } return &csi.DeleteVolumeResponse{}, nil } func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { return &csi.ControllerPublishVolumeResponse{}, nil } func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { return &csi.ControllerUnpublishVolumeResponse{}, nil } func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { // Check arguments if req.GetVolumeId() == "" { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } if req.GetVolumeCapabilities() == nil { return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request") } exists, err := filer_pb.Exists(cs.Driver, "/buckets", req.GetVolumeId(), true) if err != nil { return nil, fmt.Errorf("Error checking bucket %s exists: %v", req.GetVolumeId(), err) } if !exists { // return an error if the volume requested does not exist return nil, status.Error(codes.NotFound, fmt.Sprintf("Volume with id %s does not exist", req.GetVolumeId())) } // We currently only support RWO supportedAccessMode := &csi.VolumeCapability_AccessMode{ Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, } for _, cap := range req.VolumeCapabilities { if cap.GetAccessMode().GetMode() != supportedAccessMode.GetMode() { return &csi.ValidateVolumeCapabilitiesResponse{Message: "Only single node writer is supported"}, nil } } volCaps := req.GetVolumeCapabilities() if len(volCaps) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume capabilities not provided") } var confirmed *csi.ValidateVolumeCapabilitiesResponse_Confirmed if isValidVolumeCapabilities(cs.Driver.vcap, volCaps) { confirmed = &csi.ValidateVolumeCapabilitiesResponse_Confirmed{VolumeCapabilities: volCaps} } return &csi.ValidateVolumeCapabilitiesResponse{ Confirmed: confirmed, }, nil } func (cs *ControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) { return nil, status.Error(codes.Unimplemented, "") } func (cs *ControllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) { return nil, status.Error(codes.Unimplemented, "") } // ControllerGetCapabilities implements the default GRPC callout. // Default supports all capabilities func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) { glog.V(3).Infof("Using default ControllerGetCapabilities") return &csi.ControllerGetCapabilitiesResponse{ Capabilities: cs.Driver.cscap, }, nil } func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { return nil, status.Error(codes.Unimplemented, "") } func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) { return nil, status.Error(codes.Unimplemented, "") } func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) { return nil, status.Error(codes.Unimplemented, "") } func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { return nil, status.Error(codes.Unimplemented, "") } func (cs *ControllerServer) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) { return nil, status.Error(codes.Unimplemented, "") } func sanitizeVolumeId(volumeId string) string { volumeId = strings.ToLower(volumeId) if len(volumeId) > 63 { h := sha1.New() io.WriteString(h, volumeId) volumeId = hex.EncodeToString(h.Sum(nil)) } return volumeId } func isValidVolumeCapabilities(driverVolumeCaps []*csi.VolumeCapability_AccessMode, volCaps []*csi.VolumeCapability) bool { hasSupport := func(cap *csi.VolumeCapability) bool { for _, c := range driverVolumeCaps { if c.GetMode() == cap.AccessMode.GetMode() { return true } } return false } foundAll := true for _, c := range volCaps { if !hasSupport(c) { foundAll = false } } return foundAll }