commit b21fb2e2b62dd9ec3cf13403e87687229424f1e0 Author: Chris Lu Date: Thu Mar 19 02:59:45 2020 -0700 WIP diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d132cb8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ + +*.iml diff --git a/cmd/seaweedfs-csi-driver/main.go b/cmd/seaweedfs-csi-driver/main.go new file mode 100644 index 0000000..3919052 --- /dev/null +++ b/cmd/seaweedfs-csi-driver/main.go @@ -0,0 +1,33 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + + "github.com/seaweedfs/seaweedfs-csi-driver/pkg/driver" +) + +var ( + endpoint = flag.String("endpoint", "unix://tmp/seaweedfs-csi.sock", "CSI endpoint") + nodeID = flag.String("nodeid", "", "node id") + version = flag.Bool("version", false, "Print the version and exit.") +) + +func main() { + + flag.Parse() + + if *version { + info, err := driver.GetVersionJSON() + if err != nil { + log.Fatalln(err.Error()) + } + fmt.Println(info) + os.Exit(0) + } + + drv := driver.NewSeaweedFsDriver(*nodeID, *endpoint) + drv.Run() +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..3cb854e --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module github.com/seaweedfs/seaweedfs-csi-driver + +go 1.14 + +require ( + github.com/container-storage-interface/spec v1.2.0 + github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b + golang.org/x/net v0.0.0-20200301022130-244492dfa37a + google.golang.org/grpc v1.28.0 + k8s.io/utils v0.0.0-20200318093247-d1ab8797c558 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..5430653 --- /dev/null +++ b/go.sum @@ -0,0 +1,64 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/container-storage-interface/spec v1.2.0 h1:bD9KIVgaVKKkQ/UbVUY9kCaH/CJbhNxe0eeB4JeJV2s= +github.com/container-storage-interface/spec v1.2.0/go.mod h1:6URME8mwIBbpVyZV93Ce5St17xBiQJQY67NDsuohiy4= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200301022130-244492dfa37a h1:GuSPYbZzB5/dcLNCwLQLsg3obCJtX9IJhpXkvY7kzk0= +golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.28.0 h1:bO/TA4OxCOummhSf10siHuG7vJOiwh7SpRpFZDkOgl4= +google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +k8s.io/klog v0.3.0 h1:0VPpR+sizsiivjIfIAQH/rl8tan6jvWkS7lU+0di3lE= +k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= +k8s.io/utils v0.0.0-20200318093247-d1ab8797c558 h1:yaUqfD7/dWM081lhpYGB+Wi4awn61TY7WcAc6usJMR8= +k8s.io/utils v0.0.0-20200318093247-d1ab8797c558/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew= diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go new file mode 100644 index 0000000..e176866 --- /dev/null +++ b/pkg/driver/controllerserver.go @@ -0,0 +1,104 @@ +package driver + +import ( + "context" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/golang/glog" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type ControllerServer struct { + Driver *SeaweedFsDriver +} + +var _ = csi.ControllerServer(&ControllerServer{}) + +func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { + + volumeId := req.GetName() + params := req.GetParameters() + glog.V(4).Info("params:%v", params) + capacity := req.GetCapacityRange().GetRequiredBytes() + capacityGB := capacity >> 30 + if capacityGB == 0 { + return nil, status.Error(codes.InvalidArgument, "required bytes less than 1GB") + } + seaweedFsVolumeCount := capacityGB / 30 + if seaweedFsVolumeCount == 0 { + seaweedFsVolumeCount = 1 + } + + err := cs.Driver.createBucket(volumeId, int(seaweedFsVolumeCount)) + + return &csi.CreateVolumeResponse{ + Volume: &csi.Volume{ + VolumeId: volumeId, + CapacityBytes: 0, // seaweedFsVolumeCount * 1024 * 1024 * 30, + VolumeContext: params, + }, + }, err +} + +func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { + + volumeId := req.VolumeId + err := cs.Driver.deleteBucket(volumeId) + + return &csi.DeleteVolumeResponse{}, err +} + +func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { + + return &csi.ValidateVolumeCapabilitiesResponse{ + Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{ + VolumeContext: req.GetVolumeContext(), + VolumeCapabilities: req.GetVolumeCapabilities(), + Parameters: req.GetParameters(), + }, + }, nil + +} + +func (cs *ControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (cs *ControllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +// ControllerGetCapabilities implements the default GRPC callout. +// Default supports all capabilities +func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) { + glog.V(5).Infof("Using default ControllerGetCapabilities") + + return &csi.ControllerGetCapabilitiesResponse{ + Capabilities: cs.Driver.cscap, + }, nil +} + +func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go new file mode 100644 index 0000000..d090613 --- /dev/null +++ b/pkg/driver/driver.go @@ -0,0 +1,102 @@ +package driver + +import ( + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/golang/glog" +) + +const ( + driverName = "csi.seaweedfs.com" +) + +var ( + version = "1.0.0-rc1" +) + +type SeaweedFsDriver struct { + name string + nodeID string + version string + + endpoint string + + vcap []*csi.VolumeCapability_AccessMode + cscap []*csi.ControllerServiceCapability + + filer string + pathOnFiler string +} + +func NewSeaweedFsDriver(nodeID, endpoint string) *SeaweedFsDriver { + + glog.Infof("Driver: %v version: %v", driverName, version) + + n := &SeaweedFsDriver{ + endpoint: endpoint, + nodeID: nodeID, + name: driverName, + version: version, + } + + n.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{ + csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, + }) + n.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{ + csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, + }) + + return n +} + +func NewNodeServer(n *SeaweedFsDriver) *NodeServer { + + return &NodeServer{ + Driver: n, + } +} + +func (n *SeaweedFsDriver) Run() { + s := NewNonBlockingGRPCServer() + s.Start(n.endpoint, + NewIdentityServer(n), + NewControllerServer(n), + NewNodeServer(n)) + s.Wait() +} + +func (n *SeaweedFsDriver) AddVolumeCapabilityAccessModes(vc []csi.VolumeCapability_AccessMode_Mode) []*csi.VolumeCapability_AccessMode { + var vca []*csi.VolumeCapability_AccessMode + for _, c := range vc { + glog.Infof("Enabling volume access mode: %v", c.String()) + vca = append(vca, &csi.VolumeCapability_AccessMode{Mode: c}) + } + n.vcap = vca + return vca +} + +func (n *SeaweedFsDriver) AddControllerServiceCapabilities(cl []csi.ControllerServiceCapability_RPC_Type) { + var csc []*csi.ControllerServiceCapability + + for _, c := range cl { + glog.Infof("Enabling controller service capability: %v", c.String()) + csc = append(csc, NewControllerServiceCapability(c)) + } + + n.cscap = csc + + return +} + +func (n *SeaweedFsDriver) createBucket(volumeId string, seaweedFsVolumeCount int) error { + // TODO implement seaweedFsVolumeCount later + return nil +} +func (n *SeaweedFsDriver) deleteBucket(volumeId string) error { + return nil +} +func (n *SeaweedFsDriver) mount(source string, targetPath string) error { + return nil +} +func (n *SeaweedFsDriver) unmount(targetPath string) error { + return nil +} diff --git a/pkg/driver/identityserver.go b/pkg/driver/identityserver.go new file mode 100644 index 0000000..f92b225 --- /dev/null +++ b/pkg/driver/identityserver.go @@ -0,0 +1,49 @@ +package driver + +import ( + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/golang/glog" + "golang.org/x/net/context" +) + +type IdentityServer struct { + Driver *SeaweedFsDriver +} + +var _ = csi.IdentityServer(&IdentityServer{}) + +func (ids *IdentityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) { + + return &csi.GetPluginInfoResponse{ + Name: ids.Driver.name, + VendorVersion: ids.Driver.version, + }, nil +} + +func (ids *IdentityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) { + return &csi.ProbeResponse{}, nil +} + +func (ids *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) { + glog.V(4).Infof("Using default capabilities") + return &csi.GetPluginCapabilitiesResponse{ + Capabilities: []*csi.PluginCapability{ + { + Type: &csi.PluginCapability_Service_{ + Service: &csi.PluginCapability_Service{ + Type: csi.PluginCapability_Service_CONTROLLER_SERVICE, + }, + }, + }, + /* // TODO add later + { + Type: &csi.PluginCapability_VolumeExpansion_{ + VolumeExpansion: &csi.PluginCapability_VolumeExpansion{ + Type: csi.PluginCapability_VolumeExpansion_ONLINE, + }, + }, + }, + */ + }, + }, nil +} diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go new file mode 100644 index 0000000..abef4a4 --- /dev/null +++ b/pkg/driver/nodeserver.go @@ -0,0 +1,105 @@ +package driver + +import ( + "context" + "fmt" + "os" + "strings" + + "github.com/golang/glog" + + "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/utils/mount" +) + +type NodeServer struct { + Driver *SeaweedFsDriver + mounter mount.Interface +} + +var _ = csi.NodeServer(&NodeServer{}) + +func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { + // mount the fs here + targetPath := req.GetTargetPath() + + mo := req.GetVolumeCapability().GetMount().GetMountFlags() + if req.GetReadonly() { + mo = append(mo, "ro") + } + + source := fmt.Sprintf("%s%s", ns.Driver.filer, ns.Driver.pathOnFiler) + + err := ns.Driver.mount(source, targetPath) + + if err != nil { + if os.IsPermission(err) { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + if strings.Contains(err.Error(), "invalid argument") { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + return nil, status.Error(codes.Internal, err.Error()) + } + + return &csi.NodePublishVolumeResponse{}, nil +} + +func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { + targetPath := req.GetTargetPath() + + err := ns.Driver.unmount(targetPath) + + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + err = os.Remove(targetPath) + if err != nil && !os.IsNotExist(err) { + return nil, status.Error(codes.Internal, err.Error()) + } + + return &csi.NodeUnpublishVolumeResponse{}, nil +} + +func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) { + glog.V(5).Infof("Using default NodeGetInfo") + + return &csi.NodeGetInfoResponse{ + NodeId: ns.Driver.nodeID, + }, nil +} + +func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { + glog.V(5).Infof("Using default NodeGetCapabilities") + + return &csi.NodeGetCapabilitiesResponse{ + Capabilities: []*csi.NodeServiceCapability{ + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_UNKNOWN, + }, + }, + }, + }, + }, nil +} + +func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { + return &csi.NodeUnstageVolumeResponse{}, nil +} + +func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { + return &csi.NodeStageVolumeResponse{}, nil +} + +func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} diff --git a/pkg/driver/server.go b/pkg/driver/server.go new file mode 100644 index 0000000..64a9333 --- /dev/null +++ b/pkg/driver/server.go @@ -0,0 +1,96 @@ +package driver + +import ( + "net" + "os" + "sync" + + "github.com/golang/glog" + "google.golang.org/grpc" + + "github.com/container-storage-interface/spec/lib/go/csi" +) + +// Defines Non blocking GRPC server interfaces +type NonBlockingGRPCServer interface { + // Start services at the endpoint + Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) + // Waits for the service to stop + Wait() + // Stops the service gracefully + Stop() + // Stops the service forcefully + ForceStop() +} + +func NewNonBlockingGRPCServer() NonBlockingGRPCServer { + return &nonBlockingGRPCServer{} +} + +// NonBlocking server +type nonBlockingGRPCServer struct { + wg sync.WaitGroup + server *grpc.Server +} + +func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) { + + s.wg.Add(1) + + go s.serve(endpoint, ids, cs, ns) + + return +} + +func (s *nonBlockingGRPCServer) Wait() { + s.wg.Wait() +} + +func (s *nonBlockingGRPCServer) Stop() { + s.server.GracefulStop() +} + +func (s *nonBlockingGRPCServer) ForceStop() { + s.server.Stop() +} + +func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) { + + proto, addr, err := ParseEndpoint(endpoint) + if err != nil { + glog.Fatal(err.Error()) + } + + if proto == "unix" { + addr = "/" + addr + if err := os.Remove(addr); err != nil && !os.IsNotExist(err) { + glog.Fatalf("Failed to remove %s, error: %s", addr, err.Error()) + } + } + + listener, err := net.Listen(proto, addr) + if err != nil { + glog.Fatalf("Failed to listen: %v", err) + } + + opts := []grpc.ServerOption{ + grpc.UnaryInterceptor(logGRPC), + } + server := grpc.NewServer(opts...) + s.server = server + + if ids != nil { + csi.RegisterIdentityServer(server, ids) + } + if cs != nil { + csi.RegisterControllerServer(server, cs) + } + if ns != nil { + csi.RegisterNodeServer(server, ns) + } + + glog.Infof("Listening for connections on address: %#v", listener.Addr()) + + server.Serve(listener) + +} \ No newline at end of file diff --git a/pkg/driver/utils.go b/pkg/driver/utils.go new file mode 100644 index 0000000..2334e27 --- /dev/null +++ b/pkg/driver/utils.go @@ -0,0 +1,52 @@ +package driver + +import ( + "fmt" + "strings" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/golang/glog" + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +func NewIdentityServer(d *SeaweedFsDriver) *IdentityServer { + return &IdentityServer{ + Driver: d, + } +} + +func NewControllerServer(d *SeaweedFsDriver) *ControllerServer { + return &ControllerServer{ + Driver: d, + } +} + +func NewControllerServiceCapability(cap csi.ControllerServiceCapability_RPC_Type) *csi.ControllerServiceCapability { + return &csi.ControllerServiceCapability{ + Type: &csi.ControllerServiceCapability_Rpc{ + Rpc: &csi.ControllerServiceCapability_RPC{ + Type: cap, + }, + }, + } +} + +func ParseEndpoint(ep string) (string, string, error) { + if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") { + s := strings.SplitN(ep, "://", 2) + if s[1] != "" { + return s[0], s[1], nil + } + } + return "", "", fmt.Errorf("Invalid endpoint: %v", ep) +} + +func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + glog.V(3).Infof("GRPC call: %s", info.FullMethod) + resp, err := handler(ctx, req) + if err != nil { + glog.Errorf("GRPC error: %v", err) + } + return resp, err +} diff --git a/pkg/driver/version.go b/pkg/driver/version.go new file mode 100644 index 0000000..258267a --- /dev/null +++ b/pkg/driver/version.go @@ -0,0 +1,53 @@ +/* +Copyright 2019 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package driver + +import ( + "encoding/json" + "fmt" + "runtime" +) + +var ( + driverVersion string + gitCommit string + buildDate string +) + +type VersionInfo struct { + DriverVersion string `json:"driverVersion"` + GitCommit string `json:"gitCommit"` + BuildDate string `json:"buildDate"` + GoVersion string `json:"goVersion"` + Compiler string `json:"compiler"` + Platform string `json:"platform"` +} + +func GetVersion() VersionInfo { + return VersionInfo{ + DriverVersion: driverVersion, + GitCommit: gitCommit, + BuildDate: buildDate, + GoVersion: runtime.Version(), + Compiler: runtime.Compiler, + Platform: fmt.Sprintf("%s/%s", runtime.GOOS, runtime.GOARCH), + } +} +func GetVersionJSON() (string, error) { + info := GetVersion() + marshalled, err := json.MarshalIndent(&info, "", " ") + if err != nil { + return "", err + } + return string(marshalled), nil +}