本文主要分析csi-provisioner
的源码,关于开发一个Dynamic Provisioner
,具体可参考nfs-client-provisioner的源码分析
1. Dynamic Provisioner 1.1. Provisioner Interface 开发Dynamic Provisioner
需要实现Provisioner 接口,该接口有两个方法,分别是:
Provision:创建存储资源,并且返回一个PV对象。
Delete:移除对应的存储资源,但并没有删除PV对象。
1.2. 开发provisioner的步骤
写一个provisioner
实现Provisioner
接口(包含Provision
和Delete
的方法)。
通过该provisioner
构建ProvisionController
。
执行ProvisionController
的Run
方法。
2. CSI Provisioner CSI Provisioner的源码可参考:https://github.com/kubernetes-csi/external-provisioner。
2.1.1. 读取环境变量 源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 var ( provisioner = flag.String("provisioner" , "" , "Name of the provisioner. The provisioner will only provision volumes for claims that request a StorageClass with a provisioner field set equal to this name." ) master = flag.String("master" , "" , "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster." ) kubeconfig = flag.String("kubeconfig" , "" , "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster." ) csiEndpoint = flag.String("csi-address" , "/run/csi/socket" , "The gRPC endpoint for Target CSI Volume" ) connectionTimeout = flag.Duration("connection-timeout" , 10 *time.Second, "Timeout for waiting for CSI driver socket." ) volumeNamePrefix = flag.String("volume-name-prefix" , "pvc" , "Prefix to apply to the name of a created volume" ) volumeNameUUIDLength = flag.Int("volume-name-uuid-length" , -1 , "Truncates generated UUID of a created volume to this length. Defaults behavior is to NOT truncate." ) showVersion = flag.Bool("version" , false , "Show version." ) provisionController *controller.ProvisionController version = "unknown" ) func init () { var config *rest.Config var err error flag.Parse() flag.Set("logtostderr" , "true" ) if *showVersion { fmt.Println(os.Args[0 ], version) os.Exit(0 ) } glog.Infof("Version: %s" , version) ... }
通过init函数
解析相关参数,其实provisioner
指明为PVC提供PV的provisioner的名字,需要和StorageClass
对象中的provisioner
字段一致。
2.1.2. 获取clientset对象 源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 kubeconfigEnv := os.Getenv("KUBECONFIG" ) if kubeconfigEnv != "" { glog.Infof("Found KUBECONFIG environment variable set, using that.." ) kubeconfig = &kubeconfigEnv } if *master != "" || *kubeconfig != "" { glog.Infof("Either master or kubeconfig specified. building kube config from that.." ) config, err = clientcmd.BuildConfigFromFlags(*master, *kubeconfig) } else { glog.Infof("Building kube configs for running in cluster..." ) config, err = rest.InClusterConfig() } if err != nil { glog.Fatalf("Failed to create config: %v" , err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { glog.Fatalf("Failed to create client: %v" , err) } snapClient, err := snapclientset.NewForConfig(config) if err != nil { glog.Fatalf("Failed to create snapshot client: %v" , err) } csiAPIClient, err := csiclientset.NewForConfig(config) if err != nil { glog.Fatalf("Failed to create CSI API client: %v" , err) }
通过读取对应的k8s的配置,创建clientset
对象,用来执行k8s对应的API,其中主要包括对PV和PVC等对象的创建删除等操作。
2.1.3. k8s版本校验 1 2 3 4 5 6 serverVersion, err := clientset.Discovery().ServerVersion() if err != nil { glog.Fatalf("Error getting server version: %v" , err) }
获取了k8s的版本信息,因为provisioners的功能在k8s 1.5及以上版本才支持。
2.1.4. 连接 csi socket 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 timeStamp := time.Now().UnixNano() / int64 (time.Millisecond) identity := strconv.FormatInt(timeStamp, 10 ) + "-" + strconv.Itoa(rand.Intn(10000 )) + "-" + *provisioner socketDown := true grpcClient := &grpc.ClientConn{} for socketDown { grpcClient, err = ctrl.Connect(*csiEndpoint, *connectionTimeout) if err == nil { socketDown = false continue } time.Sleep(10 * time.Second) }
在Provisioner
会停留在初始化状态,直到csi socket
连接成功才正常运行。如果连接失败,会暂停10秒
后重试,其中涉及以下2个参数:
csiEndpoint:CSI Volume的gRPC地址,默认通过为/run/csi/socket
。
connectionTimeout:连接CSI driver socket的超时时间,默认为10秒。
2.1.5. 构造csi-Provisioner对象 1 2 3 4 5 6 7 8 9 csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *csiEndpoint, *connectionTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient) provisionController = controller.NewProvisionController( clientset, *provisioner, csiProvisioner, serverVersion.GitVersion, )
通过参数clientset
,csiAPIClient
, csiEndpoint
, connectionTimeout
, identity
, volumeNamePrefix
, volumeNameUUIDLength
,grpcClient
, snapClient
构造csi-Provisioner对象。
通过csiProvisioner
构造ProvisionController
对象。
2.1.6. 运行ProvisionController 1 2 3 func main () { provisionController.Run(wait.NeverStop) }
ProvisionController
实现了具体的PV和PVC的相关逻辑,Run
方法以常驻进程的方式运行。
2.2.1. Provision方法
csiProvisioner
的Provision
方法具体源码参考:https://github.com/kubernetes-csi/external-provisioner/blob/master/pkg/controller/controller.go#L336
Provision
方法用来创建存储资源,并且返回一个PV
对象。其中入参是VolumeOptions
,用来指定PV
对象的相关属性。
1、构造PV相关属性
1 2 3 4 pvName, err := makeVolumeName(p.volumeNamePrefix, fmt.Sprintf("%s" , options.PVC.ObjectMeta.UID), p.volumeNameUUIDLength) if err != nil { return nil , err }
2、构造CSIPersistentVolumeSource相关属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 driverState, err := checkDriverState(p.grpcClient, p.timeout, needSnapshotSupport) if err != nil { return nil , err } ... controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretNameKey, controllerPublishSecretNamespaceKey, options.Parameters, pvName, options.PVC) if err != nil { return nil , err } nodeStageSecretRef, err := getSecretReference(nodeStageSecretNameKey, nodeStageSecretNamespaceKey, options.Parameters, pvName, options.PVC) if err != nil { return nil , err } nodePublishSecretRef, err := getSecretReference(nodePublishSecretNameKey, nodePublishSecretNamespaceKey, options.Parameters, pvName, options.PVC) if err != nil { return nil , err } ... volumeAttributes := map [string ]string {provisionerIDKey: p.identity} for k, v := range rep.Volume.Attributes { volumeAttributes[k] = v } ... fsType := "" for k, v := range options.Parameters { switch strings.ToLower(k) { case "fstype" : fsType = v } } if len (fsType) == 0 { fsType = defaultFSType }
3、创建CSI CreateVolumeRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 req := csi.CreateVolumeRequest{ Name: pvName, Parameters: options.Parameters, VolumeCapabilities: volumeCaps, CapacityRange: &csi.CapacityRange{ RequiredBytes: int64 (volSizeBytes), }, } ... glog.V(5 ).Infof("CreateVolumeRequest %+v" , req) rep := &csi.CreateVolumeResponse{} ... opts := wait.Backoff{Duration: backoffDuration, Factor: backoffFactor, Steps: backoffSteps} err = wait.ExponentialBackoff(opts, func () (bool , error) { ctx, cancel := context.WithTimeout(context.Background(), p.timeout) defer cancel() rep, err = p.csiClient.CreateVolume(ctx, &req) if err == nil { return true , nil } if status, ok := status.FromError(err); ok { if status.Code() == codes.DeadlineExceeded { glog.Warningf("CreateVolume timeout: %s has expired, operation will be retried" , p.timeout.String()) return false , nil } } return false , err }) if err != nil { return nil , err } if rep.Volume != nil { glog.V(3 ).Infof("create volume rep: %+v" , *rep.Volume) } respCap := rep.GetVolume().GetCapacityBytes() if respCap < volSizeBytes { capErr := fmt.Errorf("created volume capacity %v less than requested capacity %v" , respCap, volSizeBytes) delReq := &csi.DeleteVolumeRequest{ VolumeId: rep.GetVolume().GetId(), } delReq.ControllerDeleteSecrets = provisionerCredentials ctx, cancel := context.WithTimeout(context.Background(), p.timeout) defer cancel() _, err := p.csiClient.DeleteVolume(ctx, delReq) if err != nil { capErr = fmt.Errorf("%v. Cleanup of volume %s failed, volume is orphaned: %v" , capErr, pvName, err) } return nil , capErr }
Provison
方法核心功能是调用p.csiClient.CreateVolume(ctx, &req)
。
4、构造PV对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 pv := &v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ Name: pvName, }, Spec: v1.PersistentVolumeSpec{ PersistentVolumeReclaimPolicy: options.PersistentVolumeReclaimPolicy, AccessModes: options.PVC.Spec.AccessModes, Capacity: v1.ResourceList{ v1.ResourceName(v1.ResourceStorage): bytesToGiQuantity(respCap), }, PersistentVolumeSource: v1.PersistentVolumeSource{ CSI: &v1.CSIPersistentVolumeSource{ Driver: driverState.driverName, VolumeHandle: p.volumeIdToHandle(rep.Volume.Id), FSType: fsType, VolumeAttributes: volumeAttributes, ControllerPublishSecretRef: controllerPublishSecretRef, NodeStageSecretRef: nodeStageSecretRef, NodePublishSecretRef: nodePublishSecretRef, }, }, }, } if driverState.capabilities.Has(PluginCapability_ACCESSIBILITY_CONSTRAINTS) { pv.Spec.NodeAffinity = GenerateVolumeNodeAffinity(rep.Volume.AccessibleTopology) } glog.Infof("successfully created PV %+v" , pv.Spec.PersistentVolumeSource) return pv, nil
Provision
方法只是通过VolumeOptions
参数来构建PV
对象,并没有执行具体PV
的创建或删除的操作。
不同类型的Provisioner
的,一般是PersistentVolumeSource
类型和参数不同,例如csi-provisioner
对应的PersistentVolumeSource
为CSI
,并且需要传入CSI
相关的参数:
Driver
VolumeHandle
FSType
VolumeAttributes
ControllerPublishSecretRef
NodeStageSecretRef
NodePublishSecretRef
2.2.2. Delete方法
csiProvisioner
的delete
方法具体源码参考:https://github.com/kubernetes-csi/external-provisioner/blob/master/pkg/controller/controller.go#L606
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 func (p *csiProvisioner) Delete (volume *v1.PersistentVolume) error { if volume == nil || volume.Spec.CSI == nil { return fmt.Errorf("invalid CSI PV" ) } volumeId := p.volumeHandleToId(volume.Spec.CSI.VolumeHandle) _, err := checkDriverState(p.grpcClient, p.timeout, false ) if err != nil { return err } req := csi.DeleteVolumeRequest{ VolumeId: volumeId, } storageClassName := volume.Spec.StorageClassName if len (storageClassName) != 0 { if storageClass, err := p.client.StorageV1().StorageClasses().Get(storageClassName, metav1.GetOptions{}); err == nil { provisionerSecretRef, err := getSecretReference(provisionerSecretNameKey, provisionerSecretNamespaceKey, storageClass.Parameters, volume.Name, nil ) if err != nil { return err } credentials, err := getCredentials(p.client, provisionerSecretRef) if err != nil { return err } req.ControllerDeleteSecrets = credentials } } ctx, cancel := context.WithTimeout(context.Background(), p.timeout) defer cancel() _, err = p.csiClient.DeleteVolume(ctx, &req) return err }
Delete
方法主要是调用了p.csiClient.DeleteVolume(ctx, &req)
方法。
2.3. 总结 csi provisioner
实现了Provisioner
接口,其中包含Provison
和Delete
两个方法:
Provision
:调用csiClient.CreateVolume
方法,同时构造并返回PV对象。
Delete
:调用csiClient.DeleteVolume
方法。
csi provisioner
的核心方法都调用了csi-client
相关方法。
3. csi-client
csi client
的相关代码参考:https://github.com/container-storage-interface/spec/blob/master/lib/go/csi/v0/csi.pb.go
3.1. 构造csi-client 3.1.1. 构造grpcClient 1 2 3 4 5 6 7 8 9 10 11 12 socketDown := true grpcClient := &grpc.ClientConn{} for socketDown { grpcClient, err = ctrl.Connect(*csiEndpoint, *connectionTimeout) if err == nil { socketDown = false continue } time.Sleep(10 * time.Second) }
通过连接csi socket
,连接成功才构造可用的grpcClient
。
3.1.2. 构造csi-client 通过grpcClient
构造csi-client
。
1 2 3 csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *csiEndpoint, *connectionTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient)
NewCSIProvisioner
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func NewCSIProvisioner (client kubernetes.Interface, csiAPIClient csiclientset.Interface, csiEndpoint string , connectionTimeout time.Duration, identity string , volumeNamePrefix string , volumeNameUUIDLength int , grpcClient *grpc.ClientConn, snapshotClient snapclientset.Interface) controller .Provisioner { csiClient := csi.NewControllerClient(grpcClient) provisioner := &csiProvisioner{ client: client, grpcClient: grpcClient, csiClient: csiClient, csiAPIClient: csiAPIClient, snapshotClient: snapshotClient, timeout: connectionTimeout, identity: identity, volumeNamePrefix: volumeNamePrefix, volumeNameUUIDLength: volumeNameUUIDLength, } return provisioner }
NewControllerClient
1 2 3 4 5 6 7 8 9 csiClient := csi.NewControllerClient(grpcClient) ... type controllerClient struct { cc *grpc.ClientConn } func NewControllerClient (cc *grpc.ClientConn) ControllerClient { return &controllerClient{cc} }
3.2. csiClient.CreateVolume csi provisoner
中调用csiClient.CreateVolume
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 opts := wait.Backoff{Duration: backoffDuration, Factor: backoffFactor, Steps: backoffSteps} err = wait.ExponentialBackoff(opts, func () (bool , error) { ctx, cancel := context.WithTimeout(context.Background(), p.timeout) defer cancel() rep, err = p.csiClient.CreateVolume(ctx, &req) if err == nil { return true , nil } if status, ok := status.FromError(err); ok { if status.Code() == codes.DeadlineExceeded { glog.Warningf("CreateVolume timeout: %s has expired, operation will be retried" , p.timeout.String()) return false , nil } } return false , err })
CreateVolumeRequest的构造:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 req := csi.CreateVolumeRequest{ Name: pvName, Parameters: options.Parameters, VolumeCapabilities: volumeCaps, CapacityRange: &csi.CapacityRange{ RequiredBytes: int64 (volSizeBytes), }, } ... req.VolumeContentSource = volumeContentSource ... req.AccessibilityRequirements = requirements ... req.ControllerCreateSecrets = provisionerCredentials
具体的Create
实现方法如下:
其中csiClient
是个接口类型
具体代码参考controllerClient.CreateVolume
1 2 3 4 5 6 7 8 func (c *controllerClient) CreateVolume (ctx context.Context, in *CreateVolumeRequest, opts ...grpc.CallOption) (*CreateVolumeResponse, error) { out := new (CreateVolumeResponse) err := grpc.Invoke(ctx, "/csi.v0.Controller/CreateVolume" , in, out, c.cc, opts...) if err != nil { return nil , err } return out, nil }
3.3. csiClient.DeleteVolume csi provisoner
中调用csiClient.DeleteVolume
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (p *csiProvisioner) Delete (volume *v1.PersistentVolume) error { ... req := csi.DeleteVolumeRequest{ VolumeId: volumeId, } ... ctx, cancel := context.WithTimeout(context.Background(), p.timeout) defer cancel() _, err = p.csiClient.DeleteVolume(ctx, &req) return err }
DeleteVolumeRequest的构造:
1 2 3 4 5 req := csi.DeleteVolumeRequest{ VolumeId: volumeId, } ... req.ControllerDeleteSecrets = credentials
将构造的DeleteVolumeRequest
传给DeleteVolume
方法。
具体的Delete
实现方法如下:
具体代码参考:controllerClient.DeleteVolume
1 2 3 4 5 6 7 8 func (c *controllerClient) DeleteVolume (ctx context.Context, in *DeleteVolumeRequest, opts ...grpc.CallOption) (*DeleteVolumeResponse, error) { out := new (DeleteVolumeResponse) err := grpc.Invoke(ctx, "/csi.v0.Controller/DeleteVolume" , in, out, c.cc, opts...) if err != nil { return nil , err } return out, nil }
自定义的provisioner
实现了Provisoner接口
的Provision
和Delete
方法,这两个方法主要对后端存储做创建和删除操作,并没有对PV对象进行创建和删除操作。
PV对象的相关操作具体由ProvisionController
中的provisionClaimOperation
和deleteVolumeOperation
具体执行,同时调用了具体provisioner
的Provision
和Delete
两个方法来对存储数据做处理。
1 2 3 func main () { provisionController.Run(wait.NeverStop) }
这块代码逻辑可参考:nfs-client-provisioner 源码分析
参考文章: