seaweedfs-csi-driver/pkg/driver/controllerserver.go
2020-10-12 14:05:56 -07:00

205 lines
6.7 KiB
Go

package driver
import (
"context"
"crypto/sha1"
"encoding/hex"
"fmt"
"io"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type ControllerServer struct {
Driver *SeaweedFsDriver
}
var _ = csi.ControllerServer(&ControllerServer{})
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
volumeId := sanitizeVolumeId(req.GetName())
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
glog.V(3).Infof("invalid create volume req: %v", req)
return nil, err
}
// Check arguments
if volumeId == "" {
return nil, status.Error(codes.InvalidArgument, "Name missing in request")
}
if req.GetVolumeCapabilities() == nil {
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities missing in request")
}
params := req.GetParameters()
glog.V(4).Info("params:%v", params)
capacity := req.GetCapacityRange().GetRequiredBytes()
capacityGB := capacity >> 30
if capacityGB == 0 {
return nil, status.Error(codes.InvalidArgument, "required bytes less than 1GB")
}
seaweedFsVolumeCount := capacityGB / 30
if seaweedFsVolumeCount == 0 {
seaweedFsVolumeCount = 1
}
if err := filer_pb.Mkdir(cs.Driver, "/buckets", volumeId, nil); err != nil {
return nil, fmt.Errorf("Error setting bucket metadata: %v", err)
}
glog.V(4).Infof("create volume %s", volumeId)
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: volumeId,
CapacityBytes: capacity, // 0, // seaweedFsVolumeCount * 1024 * 1024 * 30,
VolumeContext: params,
},
}, nil
}
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
volumeId := req.VolumeId
// Check arguments
if volumeId == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
glog.V(3).Infof("Invalid delete volume req: %v", req)
return nil, err
}
glog.V(4).Infof("Deleting volume %s", volumeId)
if err := filer_pb.Remove(cs.Driver, "/buckets", volumeId, true, true, true, false, nil); err != nil {
return nil, fmt.Errorf("Error setting bucket metadata: %v", err)
}
return &csi.DeleteVolumeResponse{}, nil
}
func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
return &csi.ControllerPublishVolumeResponse{}, nil
}
func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
return &csi.ControllerUnpublishVolumeResponse{}, nil
}
func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
// Check arguments
if req.GetVolumeId() == "" {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if req.GetVolumeCapabilities() == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request")
}
exists, err := filer_pb.Exists(cs.Driver, "/buckets", req.GetVolumeId(), true)
if err != nil {
return nil, fmt.Errorf("Error checking bucket %s exists: %v", req.GetVolumeId(), err)
}
if !exists {
// return an error if the volume requested does not exist
return nil, status.Error(codes.NotFound, fmt.Sprintf("Volume with id %s does not exist", req.GetVolumeId()))
}
// We currently only support RWO
supportedAccessMode := &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
}
for _, cap := range req.VolumeCapabilities {
if cap.GetAccessMode().GetMode() != supportedAccessMode.GetMode() {
return &csi.ValidateVolumeCapabilitiesResponse{Message: "Only single node writer is supported"}, nil
}
}
volCaps := req.GetVolumeCapabilities()
if len(volCaps) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume capabilities not provided")
}
var confirmed *csi.ValidateVolumeCapabilitiesResponse_Confirmed
if isValidVolumeCapabilities(cs.Driver.vcap, volCaps) {
confirmed = &csi.ValidateVolumeCapabilitiesResponse_Confirmed{VolumeCapabilities: volCaps}
}
return &csi.ValidateVolumeCapabilitiesResponse{
Confirmed: confirmed,
}, nil
}
func (cs *ControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// ControllerGetCapabilities implements the default GRPC callout.
// Default supports all capabilities
func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
glog.V(3).Infof("Using default ControllerGetCapabilities")
return &csi.ControllerGetCapabilitiesResponse{
Capabilities: cs.Driver.cscap,
}, nil
}
func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func sanitizeVolumeId(volumeId string) string {
volumeId = strings.ToLower(volumeId)
if len(volumeId) > 63 {
h := sha1.New()
io.WriteString(h, volumeId)
volumeId = hex.EncodeToString(h.Sum(nil))
}
return volumeId
}
func isValidVolumeCapabilities(driverVolumeCaps []*csi.VolumeCapability_AccessMode, volCaps []*csi.VolumeCapability) bool {
hasSupport := func(cap *csi.VolumeCapability) bool {
for _, c := range driverVolumeCaps {
if c.GetMode() == cap.AccessMode.GetMode() {
return true
}
}
return false
}
foundAll := true
for _, c := range volCaps {
if !hasSupport(c) {
foundAll = false
}
}
return foundAll
}