本文主要分析csi-provisioner的源码,关于开发一个Dynamic Provisioner,具体可参考nfs-client-provisioner的源码分析

1. Dynamic Provisioner

1.1. Provisioner Interface

开发Dynamic Provisioner需要实现Provisioner接口,该接口有两个方法,分别是:

  • Provision:创建存储资源,并且返回一个PV对象。
  • Delete:移除对应的存储资源,但并没有删除PV对象。

1.2. 开发provisioner的步骤

  1. 写一个provisioner实现Provisioner接口(包含ProvisionDelete的方法)。
  2. 通过该provisioner构建ProvisionController
  3. 执行ProvisionControllerRun方法。

2. CSI Provisioner

CSI Provisioner的源码可参考:https://github.com/kubernetes-csi/external-provisioner。

2.1. Main 函数

2.1.1. 读取环境变量

源码如下:

var (
    provisioner          = flag.String("provisioner", "", "Name of the provisioner. The provisioner will only provision volumes for claims that request a StorageClass with a provisioner field set equal to this name.")
    master               = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
    kubeconfig           = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster.")
    csiEndpoint          = flag.String("csi-address", "/run/csi/socket", "The gRPC endpoint for Target CSI Volume")
    connectionTimeout    = flag.Duration("connection-timeout", 10*time.Second, "Timeout for waiting for CSI driver socket.")
    volumeNamePrefix     = flag.String("volume-name-prefix", "pvc", "Prefix to apply to the name of a created volume")
    volumeNameUUIDLength = flag.Int("volume-name-uuid-length", -1, "Truncates generated UUID of a created volume to this length. Defaults behavior is to NOT truncate.")
    showVersion          = flag.Bool("version", false, "Show version.")

    provisionController *controller.ProvisionController
    version             = "unknown"
)

func init() {
    var config *rest.Config
    var err error

    flag.Parse()
    flag.Set("logtostderr", "true")

    if *showVersion {
        fmt.Println(os.Args[0], version)
        os.Exit(0)
    }
    glog.Infof("Version: %s", version)
    ...    
}

通过init函数解析相关参数,其实provisioner指明为PVC提供PV的provisioner的名字,需要和StorageClass对象中的provisioner字段一致。

2.1.2. 获取clientset对象

源码如下:

// get the KUBECONFIG from env if specified (useful for local/debug cluster)
kubeconfigEnv := os.Getenv("KUBECONFIG")
if kubeconfigEnv != "" {
    glog.Infof("Found KUBECONFIG environment variable set, using that..")
    kubeconfig = &kubeconfigEnv
}
if *master != "" || *kubeconfig != "" {
    glog.Infof("Either master or kubeconfig specified. building kube config from that..")
    config, err = clientcmd.BuildConfigFromFlags(*master, *kubeconfig)
} else {
    glog.Infof("Building kube configs for running in cluster...")
    config, err = rest.InClusterConfig()
}
if err != nil {
    glog.Fatalf("Failed to create config: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
    glog.Fatalf("Failed to create client: %v", err)
}

// snapclientset.NewForConfig creates a new Clientset for VolumesnapshotV1alpha1Client
snapClient, err := snapclientset.NewForConfig(config)
if err != nil {
    glog.Fatalf("Failed to create snapshot client: %v", err)
}
csiAPIClient, err := csiclientset.NewForConfig(config)
if err != nil {
    glog.Fatalf("Failed to create CSI API client: %v", err)
}

通过读取对应的k8s的配置,创建clientset对象,用来执行k8s对应的API,其中主要包括对PV和PVC等对象的创建删除等操作。

2.1.3. k8s版本校验

// The controller needs to know what the server version is because out-of-tree
// provisioners aren't officially supported until 1.5
serverVersion, err := clientset.Discovery().ServerVersion()
if err != nil {
    glog.Fatalf("Error getting server version: %v", err)
}

获取了k8s的版本信息,因为provisioners的功能在k8s 1.5及以上版本才支持。

2.1.4. 连接 csi socket

// Generate a unique ID for this provisioner
timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + *provisioner

// Provisioner will stay in Init until driver opens csi socket, once it's done
// controller will exit this loop and proceed normally.
socketDown := true
grpcClient := &grpc.ClientConn{}
for socketDown {
    grpcClient, err = ctrl.Connect(*csiEndpoint, *connectionTimeout)
    if err == nil {
        socketDown = false
        continue
    }
    time.Sleep(10 * time.Second)
}

Provisioner会停留在初始化状态,直到csi socket连接成功才正常运行。如果连接失败,会暂停10秒后重试,其中涉及以下2个参数:

  • csiEndpoint:CSI Volume的gRPC地址,默认通过为/run/csi/socket
  • connectionTimeout:连接CSI driver socket的超时时间,默认为10秒。

2.1.5. 构造csi-Provisioner对象

// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *csiEndpoint, *connectionTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient)
provisionController = controller.NewProvisionController(
    clientset,
    *provisioner,
    csiProvisioner,
    serverVersion.GitVersion,
)

通过参数clientset,csiAPIClient, csiEndpoint, connectionTimeout, identity, volumeNamePrefix, volumeNameUUIDLength,grpcClient, snapClient构造csi-Provisioner对象。

通过csiProvisioner构造ProvisionController对象。

2.1.6. 运行ProvisionController

func main() {
    provisionController.Run(wait.NeverStop)
}

ProvisionController实现了具体的PV和PVC的相关逻辑,Run方法以常驻进程的方式运行。

2.2. ProvisionDelete方法

2.2.1. Provision方法

csiProvisionerProvision方法具体源码参考:https://github.com/kubernetes-csi/external-provisioner/blob/master/pkg/controller/controller.go#L336

Provision方法用来创建存储资源,并且返回一个PV对象。其中入参是VolumeOptions,用来指定PV对象的相关属性。

1、构造PV相关属性

pvName, err := makeVolumeName(p.volumeNamePrefix, fmt.Sprintf("%s", options.PVC.ObjectMeta.UID), p.volumeNameUUIDLength)
if err != nil {
    return nil, err
}

2、构造CSIPersistentVolumeSource相关属性

driverState, err := checkDriverState(p.grpcClient, p.timeout, needSnapshotSupport)
if err != nil {
    return nil, err
}

...
// Resolve controller publish, node stage, node publish secret references
controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretNameKey, controllerPublishSecretNamespaceKey, options.Parameters, pvName, options.PVC)
if err != nil {
    return nil, err
}
nodeStageSecretRef, err := getSecretReference(nodeStageSecretNameKey, nodeStageSecretNamespaceKey, options.Parameters, pvName, options.PVC)
if err != nil {
    return nil, err
}
nodePublishSecretRef, err := getSecretReference(nodePublishSecretNameKey, nodePublishSecretNamespaceKey, options.Parameters, pvName, options.PVC)
if err != nil {
    return nil, err
}

...
volumeAttributes := map[string]string{provisionerIDKey: p.identity}
for k, v := range rep.Volume.Attributes {
    volumeAttributes[k] = v
}

...
fsType := ""
for k, v := range options.Parameters {
    switch strings.ToLower(k) {
    case "fstype":
        fsType = v
    }
}
if len(fsType) == 0 {
    fsType = defaultFSType
}

3、创建CSI CreateVolumeRequest

// Create a CSI CreateVolumeRequest and Response
req := csi.CreateVolumeRequest{
    Name:               pvName,
    Parameters:         options.Parameters,
    VolumeCapabilities: volumeCaps,
    CapacityRange: &csi.CapacityRange{
        RequiredBytes: int64(volSizeBytes),
    },
}
...
glog.V(5).Infof("CreateVolumeRequest %+v", req)

rep := &csi.CreateVolumeResponse{}
...
opts := wait.Backoff{Duration: backoffDuration, Factor: backoffFactor, Steps: backoffSteps}
err = wait.ExponentialBackoff(opts, func() (bool, error) {
    ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
    defer cancel()
    rep, err = p.csiClient.CreateVolume(ctx, &req)
    if err == nil {
        // CreateVolume has finished successfully
        return true, nil
    }

    if status, ok := status.FromError(err); ok {
        if status.Code() == codes.DeadlineExceeded {
            // CreateVolume timed out, give it another chance to complete
            glog.Warningf("CreateVolume timeout: %s has expired, operation will be retried", p.timeout.String())
            return false, nil
        }
    }
    // CreateVolume failed , no reason to retry, bailing from ExponentialBackoff
    return false, err
})

if err != nil {
    return nil, err
}

if rep.Volume != nil {
    glog.V(3).Infof("create volume rep: %+v", *rep.Volume)
}

respCap := rep.GetVolume().GetCapacityBytes()
if respCap < volSizeBytes {
    capErr := fmt.Errorf("created volume capacity %v less than requested capacity %v", respCap, volSizeBytes)
    delReq := &csi.DeleteVolumeRequest{
        VolumeId: rep.GetVolume().GetId(),
    }
    delReq.ControllerDeleteSecrets = provisionerCredentials
    ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
    defer cancel()
    _, err := p.csiClient.DeleteVolume(ctx, delReq)
    if err != nil {
        capErr = fmt.Errorf("%v. Cleanup of volume %s failed, volume is orphaned: %v", capErr, pvName, err)
    }
    return nil, capErr
}

Provison方法核心功能是调用p.csiClient.CreateVolume(ctx, &req)

4、构造PV对象

pv := &v1.PersistentVolume{
    ObjectMeta: metav1.ObjectMeta{
        Name: pvName,
    },
    Spec: v1.PersistentVolumeSpec{
        PersistentVolumeReclaimPolicy: options.PersistentVolumeReclaimPolicy,
        AccessModes:                   options.PVC.Spec.AccessModes,
        Capacity: v1.ResourceList{
            v1.ResourceName(v1.ResourceStorage): bytesToGiQuantity(respCap),
        },
        // TODO wait for CSI VolumeSource API
        PersistentVolumeSource: v1.PersistentVolumeSource{
            CSI: &v1.CSIPersistentVolumeSource{
                Driver:                     driverState.driverName,
                VolumeHandle:               p.volumeIdToHandle(rep.Volume.Id),
                FSType:                     fsType,
                VolumeAttributes:           volumeAttributes,
                ControllerPublishSecretRef: controllerPublishSecretRef,
                NodeStageSecretRef:         nodeStageSecretRef,
                NodePublishSecretRef:       nodePublishSecretRef,
            },
        },
    },
}

if driverState.capabilities.Has(PluginCapability_ACCESSIBILITY_CONSTRAINTS) {
    pv.Spec.NodeAffinity = GenerateVolumeNodeAffinity(rep.Volume.AccessibleTopology)
}

glog.Infof("successfully created PV %+v", pv.Spec.PersistentVolumeSource)

return pv, nil

Provision方法只是通过VolumeOptions参数来构建PV对象,并没有执行具体PV的创建或删除的操作。

不同类型的Provisioner的,一般是PersistentVolumeSource类型和参数不同,例如csi-provisioner对应的PersistentVolumeSourceCSI,并且需要传入CSI相关的参数:

  • Driver
  • VolumeHandle
  • FSType
  • VolumeAttributes
  • ControllerPublishSecretRef
  • NodeStageSecretRef
  • NodePublishSecretRef

2.2.2. Delete方法

csiProvisionerdelete方法具体源码参考:https://github.com/kubernetes-csi/external-provisioner/blob/master/pkg/controller/controller.go#L606

func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
    if volume == nil || volume.Spec.CSI == nil {
        return fmt.Errorf("invalid CSI PV")
    }
    volumeId := p.volumeHandleToId(volume.Spec.CSI.VolumeHandle)

    _, err := checkDriverState(p.grpcClient, p.timeout, false)
    if err != nil {
        return err
    }

    req := csi.DeleteVolumeRequest{
        VolumeId: volumeId,
    }
    // get secrets if StorageClass specifies it
    storageClassName := volume.Spec.StorageClassName
    if len(storageClassName) != 0 {
        if storageClass, err := p.client.StorageV1().StorageClasses().Get(storageClassName, metav1.GetOptions{}); err == nil {
            // Resolve provision secret credentials.
            // No PVC is provided when resolving provision/delete secret names, since the PVC may or may not exist at delete time.
            provisionerSecretRef, err := getSecretReference(provisionerSecretNameKey, provisionerSecretNamespaceKey, storageClass.Parameters, volume.Name, nil)
            if err != nil {
                return err
            }
            credentials, err := getCredentials(p.client, provisionerSecretRef)
            if err != nil {
                return err
            }
            req.ControllerDeleteSecrets = credentials
        }

    }
    ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
    defer cancel()

    _, err = p.csiClient.DeleteVolume(ctx, &req)

    return err
}

Delete方法主要是调用了p.csiClient.DeleteVolume(ctx, &req)方法。

2.3. 总结

csi provisioner实现了Provisioner接口,其中包含ProvisonDelete两个方法:

  • Provision:调用csiClient.CreateVolume方法,同时构造并返回PV对象。
  • Delete:调用csiClient.DeleteVolume方法。

csi provisioner的核心方法都调用了csi-client相关方法。

3. csi-client

csi client的相关代码参考:https://github.com/container-storage-interface/spec/blob/master/lib/go/csi/v0/csi.pb.go

3.1. 构造csi-client

3.1.1. 构造grpcClient

// Provisioner will stay in Init until driver opens csi socket, once it's done
// controller will exit this loop and proceed normally.
socketDown := true
grpcClient := &grpc.ClientConn{}
for socketDown {
    grpcClient, err = ctrl.Connect(*csiEndpoint, *connectionTimeout)
    if err == nil {
        socketDown = false
        continue
    }
    time.Sleep(10 * time.Second)
}

通过连接csi socket,连接成功才构造可用的grpcClient

3.1.2. 构造csi-client

通过grpcClient构造csi-client

// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *csiEndpoint, *connectionTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient)

NewCSIProvisioner

// NewCSIProvisioner creates new CSI provisioner
func NewCSIProvisioner(client kubernetes.Interface,
    csiAPIClient csiclientset.Interface,
    csiEndpoint string,
    connectionTimeout time.Duration,
    identity string,
    volumeNamePrefix string,
    volumeNameUUIDLength int,
    grpcClient *grpc.ClientConn,
    snapshotClient snapclientset.Interface) controller.Provisioner {

    csiClient := csi.NewControllerClient(grpcClient)
    provisioner := &csiProvisioner{
        client:               client,
        grpcClient:           grpcClient,
        csiClient:            csiClient,
        csiAPIClient:         csiAPIClient,
        snapshotClient:       snapshotClient,
        timeout:              connectionTimeout,
        identity:             identity,
        volumeNamePrefix:     volumeNamePrefix,
        volumeNameUUIDLength: volumeNameUUIDLength,
    }
    return provisioner
}

NewControllerClient

csiClient := csi.NewControllerClient(grpcClient)
...
type controllerClient struct {
    cc *grpc.ClientConn
}

func NewControllerClient(cc *grpc.ClientConn) ControllerClient {
    return &controllerClient{cc}
}

3.2. csiClient.CreateVolume

csi provisoner中调用csiClient.CreateVolume代码如下:

opts := wait.Backoff{Duration: backoffDuration, Factor: backoffFactor, Steps: backoffSteps}
err = wait.ExponentialBackoff(opts, func() (bool, error) {
    ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
    defer cancel()
    rep, err = p.csiClient.CreateVolume(ctx, &req)
    if err == nil {
        // CreateVolume has finished successfully
        return true, nil
    }

    if status, ok := status.FromError(err); ok {
        if status.Code() == codes.DeadlineExceeded {
            // CreateVolume timed out, give it another chance to complete
            glog.Warningf("CreateVolume timeout: %s has expired, operation will be retried", p.timeout.String())
            return false, nil
        }
    }
    // CreateVolume failed , no reason to retry, bailing from ExponentialBackoff
    return false, err
})

CreateVolumeRequest的构造:

// Create a CSI CreateVolumeRequest and Response
req := csi.CreateVolumeRequest{
    Name:               pvName,
    Parameters:         options.Parameters,
    VolumeCapabilities: volumeCaps,
    CapacityRange: &csi.CapacityRange{
        RequiredBytes: int64(volSizeBytes),
    },
}
...
req.VolumeContentSource = volumeContentSource
...
req.AccessibilityRequirements = requirements
...
req.ControllerCreateSecrets = provisionerCredentials

具体的Create实现方法如下:

其中csiClient是个接口类型

具体代码参考controllerClient.CreateVolume

func (c *controllerClient) CreateVolume(ctx context.Context, in *CreateVolumeRequest, opts ...grpc.CallOption) (*CreateVolumeResponse, error) {
    out := new(CreateVolumeResponse)
    err := grpc.Invoke(ctx, "/csi.v0.Controller/CreateVolume", in, out, c.cc, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

3.3. csiClient.DeleteVolume

csi provisoner中调用csiClient.DeleteVolume代码如下:

func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
    ...
    req := csi.DeleteVolumeRequest{
        VolumeId: volumeId,
    }
    // get secrets if StorageClass specifies it
    ...

    ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
    defer cancel()

    _, err = p.csiClient.DeleteVolume(ctx, &req)

    return err
}

DeleteVolumeRequest的构造:

req := csi.DeleteVolumeRequest{
    VolumeId: volumeId,
}
...
req.ControllerDeleteSecrets = credentials

将构造的DeleteVolumeRequest传给DeleteVolume方法。

具体的Delete实现方法如下:

具体代码参考:controllerClient.DeleteVolume

func (c *controllerClient) DeleteVolume(ctx context.Context, in *DeleteVolumeRequest, opts ...grpc.CallOption) (*DeleteVolumeResponse, error) {
    out := new(DeleteVolumeResponse)
    err := grpc.Invoke(ctx, "/csi.v0.Controller/DeleteVolume", in, out, c.cc, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

4. ProvisionController.Run

自定义的provisioner实现了Provisoner接口ProvisionDelete方法,这两个方法主要对后端存储做创建和删除操作,并没有对PV对象进行创建和删除操作。

PV对象的相关操作具体由ProvisionController中的provisionClaimOperationdeleteVolumeOperation具体执行,同时调用了具体provisionerProvisionDelete两个方法来对存储数据做处理。

func main() {
    provisionController.Run(wait.NeverStop)
}

这块代码逻辑可参考:nfs-client-provisioner 源码分析

参考文章:

Copyright © www.huweihuang.com all right reserved,powered by GitbookUpdated at 2023-10-29 11:11:39

results matching ""

    No results matching ""