From 77697d343356a59b649ffbcf42e927439033a413 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 1 Jun 2020 22:30:31 -0700 Subject: [PATCH] configure filer via CLI --- cmd/seaweedfs-csi-driver/main.go | 9 +-- deploy/kubernetes/provisioner.yaml | 102 +++++++++++++++++++++++++++++ pkg/driver/controllerserver.go | 9 +-- pkg/driver/driver.go | 40 +++++++++-- pkg/driver/mounter.go | 30 +-------- pkg/driver/mounter_seaweedfs.go | 4 +- pkg/driver/nodeserver.go | 3 +- 7 files changed, 149 insertions(+), 48 deletions(-) create mode 100644 deploy/kubernetes/provisioner.yaml diff --git a/cmd/seaweedfs-csi-driver/main.go b/cmd/seaweedfs-csi-driver/main.go index 559bad0..a654992 100644 --- a/cmd/seaweedfs-csi-driver/main.go +++ b/cmd/seaweedfs-csi-driver/main.go @@ -10,9 +10,10 @@ import ( ) var ( - endpoint = flag.String("endpoint", "unix://tmp/seaweedfs-csi.sock", "CSI endpoint to accept gRPC calls") - nodeID = flag.String("nodeid", "", "node id") - version = flag.Bool("version", false, "Print the version and exit.") + filer = flag.String("filer", "localhost:8888", "filer server") + endpoint = flag.String("endpoint", "unix://tmp/seaweedfs-csi.sock", "CSI endpoint to accept gRPC calls") + nodeID = flag.String("nodeid", "", "node id") + version = flag.Bool("version", false, "Print the version and exit.") ) func main() { @@ -28,6 +29,6 @@ func main() { os.Exit(0) } - drv := driver.NewSeaweedFsDriver(*nodeID, *endpoint) + drv := driver.NewSeaweedFsDriver(*filer, *nodeID, *endpoint) drv.Run() } diff --git a/deploy/kubernetes/provisioner.yaml b/deploy/kubernetes/provisioner.yaml new file mode 100644 index 0000000..226257e --- /dev/null +++ b/deploy/kubernetes/provisioner.yaml @@ -0,0 +1,102 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: csi-provisioner-sa + namespace: default +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: external-provisioner-runner +rules: + - apiGroups: [""] + resources: ["secrets"] + verbs: ["get", "list"] + - apiGroups: [""] + resources: ["persistentvolumes"] + verbs: ["get", "list", "watch", "create", "delete"] + - apiGroups: [""] + resources: ["persistentvolumeclaims"] + verbs: ["get", "list", "watch", "update"] + - apiGroups: ["storage.k8s.io"] + resources: ["storageclasses"] + verbs: ["get", "list", "watch"] + - apiGroups: [""] + resources: ["events"] + verbs: ["list", "watch", "create", "update", "patch"] +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: csi-provisioner-role +subjects: + - kind: ServiceAccount + name: csi-provisioner-sa + namespace: default +roleRef: + kind: ClusterRole + name: external-provisioner-runner + apiGroup: rbac.authorization.k8s.io +--- +kind: Service +apiVersion: v1 +metadata: + name: csi-provisioner-seaweedfs + namespace: default + labels: + app: csi-provisioner-seaweedfs +spec: + selector: + app: csi-provisioner-seaweedfs + ports: + - name: dummy + port: 12345 +--- +kind: StatefulSet +apiVersion: apps/v1beta1 +metadata: + name: csi-provisioner-seaweedfs + namespace: default +spec: + serviceName: "csi-provisioner-seaweedfs" + replicas: 1 + template: + metadata: + labels: + app: csi-provisioner-seaweedfs + spec: + serviceAccount: csi-provisioner-sa + containers: + - name: csi-provisioner + image: quay.io/k8scsi/csi-provisioner:v1.1.0 + args: + - "--provisioner=seaweedfs-csi-driver" + - "--csi-address=$(ADDRESS)" + - "--v=4" + env: + - name: ADDRESS + value: /var/lib/kubelet/plugins/seaweedfs-csi-driver/csi.sock + imagePullPolicy: "IfNotPresent" + volumeMounts: + - name: socket-dir + mountPath: /var/lib/kubelet/plugins/seaweedfs-csi-driver + - name: csi-seaweedfs + image: seaweedfs/csi:v1.1.1 + args: + - "--endpoint=$(CSI_ENDPOINT)" + - "--nodeid=$(NODE_ID)" + - "--v=4" + env: + - name: CSI_ENDPOINT + value: unix:///var/lib/kubelet/plugins/seaweedfs-csi-driver/csi.sock + - name: NODE_ID + valueFrom: + fieldRef: + fieldPath: spec.nodeName + imagePullPolicy: "Always" + volumeMounts: + - name: socket-dir + mountPath: /var/lib/kubelet/plugins/seaweedfs-csi-driver + volumes: + - name: socket-dir + emptyDir: {} diff --git a/pkg/driver/controllerserver.go b/pkg/driver/controllerserver.go index c0eeed3..78466e2 100644 --- a/pkg/driver/controllerserver.go +++ b/pkg/driver/controllerserver.go @@ -50,8 +50,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol seaweedFsVolumeCount = 1 } - cfg := newConfigFromSecrets(req.GetSecrets()) - if err := filer_pb.Mkdir(cfg, "/buckets", volumeId, nil); err != nil { + if err := filer_pb.Mkdir(cs.Driver, "/buckets", volumeId, nil); err != nil { return nil, fmt.Errorf("Error setting bucket metadata: %v", err) } @@ -81,8 +80,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } glog.V(4).Infof("Deleting volume %s", volumeId) - cfg := newConfigFromSecrets(req.GetSecrets()) - if err := filer_pb.Remove(cfg, "/buckets", volumeId, true, true, true); err != nil { + if err := filer_pb.Remove(cs.Driver, "/buckets", volumeId, true, true, true); err != nil { return nil, fmt.Errorf("Error setting bucket metadata: %v", err) } @@ -107,8 +105,7 @@ func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request") } - cfg := newConfigFromSecrets(req.GetSecrets()) - exists, err := filer_pb.Exists(cfg, "/buckets", req.GetVolumeId(), true) + exists, err := filer_pb.Exists(cs.Driver, "/buckets", req.GetVolumeId(), true) if err != nil { return nil, fmt.Errorf("Error checking bucket %s exists: %v", req.GetVolumeId(), err) } diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index df46223..1261a86 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -4,12 +4,17 @@ import ( "fmt" "os" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/glog" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "k8s.io/klog" "k8s.io/client-go/rest" + "k8s.io/klog" ) const ( @@ -30,17 +35,21 @@ type SeaweedFsDriver struct { vcap []*csi.VolumeCapability_AccessMode cscap []*csi.ControllerServiceCapability + filer string + grpcDialOption grpc.DialOption } -func NewSeaweedFsDriver(nodeID, endpoint string) *SeaweedFsDriver { +func NewSeaweedFsDriver(filer, nodeID, endpoint string) *SeaweedFsDriver { glog.Infof("Driver: %v version: %v", driverName, version) n := &SeaweedFsDriver{ - endpoint: endpoint, - nodeID: nodeID, - name: driverName, - version: version, + endpoint: endpoint, + nodeID: nodeID, + name: driverName, + version: version, + filer: filer, + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), } n.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{ @@ -106,3 +115,22 @@ func (d *SeaweedFsDriver) ValidateControllerServiceRequest(c csi.ControllerServi } return status.Error(codes.InvalidArgument, fmt.Sprintf("%s", c)) } + +var _ = filer_pb.FilerClient(&SeaweedFsDriver{}) + +func (d *SeaweedFsDriver) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + + filerGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(d.filer) + if parseErr != nil { + return fmt.Errorf("failed to parse filer %v: %v", filerGrpcAddress, parseErr) + } + + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, filerGrpcAddress, d.grpcDialOption) + +} +func (d *SeaweedFsDriver) AdjustedUrl(hostAndPort string) string { + return hostAndPort +} diff --git a/pkg/driver/mounter.go b/pkg/driver/mounter.go index e94544d..38dd1de 100644 --- a/pkg/driver/mounter.go +++ b/pkg/driver/mounter.go @@ -4,12 +4,7 @@ import ( "fmt" "time" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/glog" - "google.golang.org/grpc" "os/exec" "k8s.io/utils/mount" ) @@ -24,8 +19,8 @@ type Mounter interface { Mount(target string) error } -func newMounter(bucketName string, cfg *Config) (Mounter, error) { - return newSeaweedFsMounter(bucketName, cfg) +func newMounter(bucketName string, filer string) (Mounter, error) { + return newSeaweedFsMounter(bucketName, filer) } func fuseMount(path string, command string, args []string) error { @@ -64,24 +59,3 @@ func newConfigFromSecrets(secrets map[string]string) *Config { } return t } - -var _ = filer_pb.FilerClient(&Config{}) - -func (cfg *Config) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - - filerGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(cfg.Filer) - if parseErr != nil { - return fmt.Errorf("failed to parse filer %v: %v", filerGrpcAddress, parseErr) - } - - grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") - - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { - client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) - }, filerGrpcAddress, grpcDialOption) - -} -func (cfg *Config) AdjustedUrl(hostAndPort string) string { - return hostAndPort -} diff --git a/pkg/driver/mounter_seaweedfs.go b/pkg/driver/mounter_seaweedfs.go index 12fa1e7..4d448de 100644 --- a/pkg/driver/mounter_seaweedfs.go +++ b/pkg/driver/mounter_seaweedfs.go @@ -14,10 +14,10 @@ const ( seaweedFsCmd = "weed" ) -func newSeaweedFsMounter(bucketName string, cfg *Config) (Mounter, error) { +func newSeaweedFsMounter(bucketName string, filer string) (Mounter, error) { return &seaweedFsMounter{ bucketName: bucketName, - filerUrl: cfg.Filer, + filerUrl: filer, }, nil } diff --git a/pkg/driver/nodeserver.go b/pkg/driver/nodeserver.go index faf35ba..b58b23b 100644 --- a/pkg/driver/nodeserver.go +++ b/pkg/driver/nodeserver.go @@ -50,8 +50,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis mo = append(mo, "ro") } - cfg := newConfigFromSecrets(req.GetSecrets()) - mounter, err := newMounter(volumeID, cfg) + mounter, err := newMounter(volumeID, ns.Driver.filer) if err != nil { return nil, err }