configure filer via CLI

This commit is contained in:
Chris Lu 2020-06-01 22:30:31 -07:00
parent 002acd516d
commit 77697d3433
7 changed files with 149 additions and 48 deletions

View File

@ -10,9 +10,10 @@ import (
)
var (
endpoint = flag.String("endpoint", "unix://tmp/seaweedfs-csi.sock", "CSI endpoint to accept gRPC calls")
nodeID = flag.String("nodeid", "", "node id")
version = flag.Bool("version", false, "Print the version and exit.")
filer = flag.String("filer", "localhost:8888", "filer server")
endpoint = flag.String("endpoint", "unix://tmp/seaweedfs-csi.sock", "CSI endpoint to accept gRPC calls")
nodeID = flag.String("nodeid", "", "node id")
version = flag.Bool("version", false, "Print the version and exit.")
)
func main() {
@ -28,6 +29,6 @@ func main() {
os.Exit(0)
}
drv := driver.NewSeaweedFsDriver(*nodeID, *endpoint)
drv := driver.NewSeaweedFsDriver(*filer, *nodeID, *endpoint)
drv.Run()
}

View File

@ -0,0 +1,102 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: csi-provisioner-sa
namespace: default
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: external-provisioner-runner
rules:
- apiGroups: [""]
resources: ["secrets"]
verbs: ["get", "list"]
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: [""]
resources: ["persistentvolumeclaims"]
verbs: ["get", "list", "watch", "update"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["events"]
verbs: ["list", "watch", "create", "update", "patch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: csi-provisioner-role
subjects:
- kind: ServiceAccount
name: csi-provisioner-sa
namespace: default
roleRef:
kind: ClusterRole
name: external-provisioner-runner
apiGroup: rbac.authorization.k8s.io
---
kind: Service
apiVersion: v1
metadata:
name: csi-provisioner-seaweedfs
namespace: default
labels:
app: csi-provisioner-seaweedfs
spec:
selector:
app: csi-provisioner-seaweedfs
ports:
- name: dummy
port: 12345
---
kind: StatefulSet
apiVersion: apps/v1beta1
metadata:
name: csi-provisioner-seaweedfs
namespace: default
spec:
serviceName: "csi-provisioner-seaweedfs"
replicas: 1
template:
metadata:
labels:
app: csi-provisioner-seaweedfs
spec:
serviceAccount: csi-provisioner-sa
containers:
- name: csi-provisioner
image: quay.io/k8scsi/csi-provisioner:v1.1.0
args:
- "--provisioner=seaweedfs-csi-driver"
- "--csi-address=$(ADDRESS)"
- "--v=4"
env:
- name: ADDRESS
value: /var/lib/kubelet/plugins/seaweedfs-csi-driver/csi.sock
imagePullPolicy: "IfNotPresent"
volumeMounts:
- name: socket-dir
mountPath: /var/lib/kubelet/plugins/seaweedfs-csi-driver
- name: csi-seaweedfs
image: seaweedfs/csi:v1.1.1
args:
- "--endpoint=$(CSI_ENDPOINT)"
- "--nodeid=$(NODE_ID)"
- "--v=4"
env:
- name: CSI_ENDPOINT
value: unix:///var/lib/kubelet/plugins/seaweedfs-csi-driver/csi.sock
- name: NODE_ID
valueFrom:
fieldRef:
fieldPath: spec.nodeName
imagePullPolicy: "Always"
volumeMounts:
- name: socket-dir
mountPath: /var/lib/kubelet/plugins/seaweedfs-csi-driver
volumes:
- name: socket-dir
emptyDir: {}

View File

@ -50,8 +50,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
seaweedFsVolumeCount = 1
}
cfg := newConfigFromSecrets(req.GetSecrets())
if err := filer_pb.Mkdir(cfg, "/buckets", volumeId, nil); err != nil {
if err := filer_pb.Mkdir(cs.Driver, "/buckets", volumeId, nil); err != nil {
return nil, fmt.Errorf("Error setting bucket metadata: %v", err)
}
@ -81,8 +80,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
}
glog.V(4).Infof("Deleting volume %s", volumeId)
cfg := newConfigFromSecrets(req.GetSecrets())
if err := filer_pb.Remove(cfg, "/buckets", volumeId, true, true, true); err != nil {
if err := filer_pb.Remove(cs.Driver, "/buckets", volumeId, true, true, true); err != nil {
return nil, fmt.Errorf("Error setting bucket metadata: %v", err)
}
@ -107,8 +105,7 @@ func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req
return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request")
}
cfg := newConfigFromSecrets(req.GetSecrets())
exists, err := filer_pb.Exists(cfg, "/buckets", req.GetVolumeId(), true)
exists, err := filer_pb.Exists(cs.Driver, "/buckets", req.GetVolumeId(), true)
if err != nil {
return nil, fmt.Errorf("Error checking bucket %s exists: %v", req.GetVolumeId(), err)
}

View File

@ -4,12 +4,17 @@ import (
"fmt"
"os"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog"
"k8s.io/client-go/rest"
"k8s.io/klog"
)
const (
@ -30,17 +35,21 @@ type SeaweedFsDriver struct {
vcap []*csi.VolumeCapability_AccessMode
cscap []*csi.ControllerServiceCapability
filer string
grpcDialOption grpc.DialOption
}
func NewSeaweedFsDriver(nodeID, endpoint string) *SeaweedFsDriver {
func NewSeaweedFsDriver(filer, nodeID, endpoint string) *SeaweedFsDriver {
glog.Infof("Driver: %v version: %v", driverName, version)
n := &SeaweedFsDriver{
endpoint: endpoint,
nodeID: nodeID,
name: driverName,
version: version,
endpoint: endpoint,
nodeID: nodeID,
name: driverName,
version: version,
filer: filer,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
}
n.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
@ -106,3 +115,22 @@ func (d *SeaweedFsDriver) ValidateControllerServiceRequest(c csi.ControllerServi
}
return status.Error(codes.InvalidArgument, fmt.Sprintf("%s", c))
}
var _ = filer_pb.FilerClient(&SeaweedFsDriver{})
func (d *SeaweedFsDriver) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
filerGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(d.filer)
if parseErr != nil {
return fmt.Errorf("failed to parse filer %v: %v", filerGrpcAddress, parseErr)
}
return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filerGrpcAddress, d.grpcDialOption)
}
func (d *SeaweedFsDriver) AdjustedUrl(hostAndPort string) string {
return hostAndPort
}

View File

@ -4,12 +4,7 @@ import (
"fmt"
"time"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/glog"
"google.golang.org/grpc"
"os/exec"
"k8s.io/utils/mount"
)
@ -24,8 +19,8 @@ type Mounter interface {
Mount(target string) error
}
func newMounter(bucketName string, cfg *Config) (Mounter, error) {
return newSeaweedFsMounter(bucketName, cfg)
func newMounter(bucketName string, filer string) (Mounter, error) {
return newSeaweedFsMounter(bucketName, filer)
}
func fuseMount(path string, command string, args []string) error {
@ -64,24 +59,3 @@ func newConfigFromSecrets(secrets map[string]string) *Config {
}
return t
}
var _ = filer_pb.FilerClient(&Config{})
func (cfg *Config) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
filerGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(cfg.Filer)
if parseErr != nil {
return fmt.Errorf("failed to parse filer %v: %v", filerGrpcAddress, parseErr)
}
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filerGrpcAddress, grpcDialOption)
}
func (cfg *Config) AdjustedUrl(hostAndPort string) string {
return hostAndPort
}

View File

@ -14,10 +14,10 @@ const (
seaweedFsCmd = "weed"
)
func newSeaweedFsMounter(bucketName string, cfg *Config) (Mounter, error) {
func newSeaweedFsMounter(bucketName string, filer string) (Mounter, error) {
return &seaweedFsMounter{
bucketName: bucketName,
filerUrl: cfg.Filer,
filerUrl: filer,
}, nil
}

View File

@ -50,8 +50,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
mo = append(mo, "ro")
}
cfg := newConfigFromSecrets(req.GetSecrets())
mounter, err := newMounter(volumeID, cfg)
mounter, err := newMounter(volumeID, ns.Driver.filer)
if err != nil {
return nil, err
}