[源码分析] nfs-client-provisioner源码分析

Posted by 胡伟煌 on 2018-06-24

如果要开发一个Dynamic Provisioner,需要使用到the helper library

1. Dynamic Provisioner

1.1. Provisioner Interface

开发Dynamic Provisioner需要实现Provisioner接口,该接口有两个方法,分别是:

  • Provision:创建存储资源,并且返回一个PV对象。
  • Delete:移除对应的存储资源,但并没有删除PV对象。

Provisioner 接口源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Provisioner is an interface that creates templates for PersistentVolumes
// and can create the volume as a new resource in the infrastructure provider.
// It can also remove the volume it created from the underlying storage
// provider.
type Provisioner interface {
// Provision creates a volume i.e. the storage asset and returns a PV object
// for the volume
Provision(VolumeOptions) (*v1.PersistentVolume, error)
// Delete removes the storage asset that was created by Provision backing the
// given PV. Does not delete the PV object itself.
//
// May return IgnoredError to indicate that the call has been ignored and no
// action taken.
Delete(*v1.PersistentVolume) error
}

1.2. VolumeOptions

Provisioner接口的Provision方法的入参是一个VolumeOptions对象。VolumeOptions对象包含了创建PV对象所需要的信息,例如:PV的回收策略,PV的名字,PV所对应的PVC对象以及PVC的StorageClass对象使用的参数等。

VolumeOptions 源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// VolumeOptions contains option information about a volume
// https://github.com/kubernetes/kubernetes/blob/release-1.4/pkg/volume/plugins.go
type VolumeOptions struct {
// Reclamation policy for a persistent volume
PersistentVolumeReclaimPolicy v1.PersistentVolumeReclaimPolicy
// PV.Name of the appropriate PersistentVolume. Used to generate cloud
// volume name.
PVName string

// PV mount options. Not validated - mount of the PVs will simply fail if one is invalid.
MountOptions []string

// PVC is reference to the claim that lead to provisioning of a new PV.
// Provisioners *must* create a PV that would be matched by this PVC,
// i.e. with required capacity, accessMode, labels matching PVC.Selector and
// so on.
PVC *v1.PersistentVolumeClaim
// Volume provisioning parameters from StorageClass
Parameters map[string]string

// Node selected by the scheduler for the volume.
SelectedNode *v1.Node
// Topology constraint parameter from StorageClass
AllowedTopologies []v1.TopologySelectorTerm
}

1.3. ProvisionController

ProvisionController是一个给PVC提供PV的控制器,具体执行Provisioner接口的ProvisionDelete的方法的所有逻辑。

1.4. 开发provisioner的步骤

  1. 写一个provisioner实现Provisioner接口(包含ProvisionDelete的方法)。
  2. 通过该provisioner构建ProvisionController
  3. 执行ProvisionControllerRun方法。

2. NFS Client Provisioner

nfs-client-provisioner是一个automatic provisioner,使用NFS作为存储,自动创建PV和对应的PVC,本身不提供NFS存储,需要外部先有一套NFS存储服务。

  • PV以 ${namespace}-${pvcName}-${pvName}的命名格式提供(在NFS服务器上)
  • PV回收的时候以 archieved-${namespace}-${pvcName}-${pvName} 的命名格式(在NFS服务器上)

以下通过nfs-client-provisioner的源码分析来说明开发自定义provisioner整个过程。nfs-client-provisioner的主要代码都在provisioner.go的文件中。

nfs-client-provisioner源码地址:https://github.com/kubernetes-incubator/external-storage/tree/master/nfs-client

2.1. Main函数

2.1.1. 读取环境变量

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func main() {
flag.Parse()
flag.Set("logtostderr", "true")

server := os.Getenv("NFS_SERVER")
if server == "" {
glog.Fatal("NFS_SERVER not set")
}
path := os.Getenv("NFS_PATH")
if path == "" {
glog.Fatal("NFS_PATH not set")
}
provisionerName := os.Getenv(provisionerNameKey)
if provisionerName == "" {
glog.Fatalf("environment variable %s is not set! Please set it.", provisionerNameKey)
}
...
}

main函数先获取NFS_SERVERNFS_PATHPROVISIONER_NAME三个环境变量的值,因此在部署nfs-client-provisioner的时候,需要将这三个环境变量的值传入。

  • NFS_SERVER:NFS服务端的IP地址。
  • NFS_PATH:NFS服务端设置的共享目录
  • PROVISIONER_NAME:provisioner的名字,需要和StorageClass对象中的provisioner字段一致。

例如StorageClass对象的yaml文件如下:

1
2
3
4
5
6
7
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: managed-nfs-storage
provisioner: fuseim.pri/ifs # or choose another name, must match deployment's env PROVISIONER_NAME'
parameters:
archiveOnDelete: "false" # When set to "false" your PVs will not be archived by the provisioner upon deletion of the PVC.

2.1.2. 获取clientset对象

源码如下:

1
2
3
4
5
6
7
8
9
10
// Create an InClusterConfig and use it to create a client for the controller
// to use to communicate with Kubernetes
config, err := rest.InClusterConfig()
if err != nil {
glog.Fatalf("Failed to create config: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
glog.Fatalf("Failed to create client: %v", err)
}

通过读取对应的k8s的配置,创建clientset对象,用来执行k8s对应的API,其中主要包括对PV和PVC等对象的创建删除等操作。

2.1.3. 构造nfsProvisioner对象

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
// The controller needs to know what the server version is because out-of-tree
// provisioners aren't officially supported until 1.5
serverVersion, err := clientset.Discovery().ServerVersion()
if err != nil {
glog.Fatalf("Error getting server version: %v", err)
}

clientNFSProvisioner := &nfsProvisioner{
client: clientset,
server: server,
path: path,
}

通过clientsetserverpath等值构造nfsProvisioner对象,同时还获取了k8s的版本信息,因为provisioners的功能在k8s 1.5及以上版本才支持。

nfsProvisioner类型定义如下:

1
2
3
4
5
6
7
type nfsProvisioner struct {
client kubernetes.Interface
server string
path string
}

var _ controller.Provisioner = &nfsProvisioner{}

nfsProvisioner是一个自定义的provisioner,用来实现Provisioner的接口,其中的属性除了serverpath这两个关于NFS相关的参数,还包含了client,主要用来调用k8s的API。

1
var _ controller.Provisioner = &nfsProvisioner{}

以上用法用来检测nfsProvisioner是否实现了Provisioner的接口。

2.1.4. 构建并运行ProvisionController

源码如下:

1
2
3
4
// Start the provision controller which will dynamically provision efs NFS
// PVs
pc := controller.NewProvisionController(clientset, provisionerName, clientNFSProvisioner, serverVersion.GitVersion)
pc.Run(wait.NeverStop)

通过nfsProvisioner构造ProvisionController对象并执行Run方法,ProvisionController实现了具体的PV和PVC的相关逻辑,Run方法以常驻进程的方式运行。

2.2. ProvisionDelete方法

2.2.1. Provision方法

nfsProvisionerProvision方法具体源码参考:https://github.com/kubernetes-incubator/external-storage/blob/master/nfs-client/cmd/nfs-client-provisioner/provisioner.go#L56

Provision方法用来创建存储资源,并且返回一个PV对象。其中入参是VolumeOptions,用来指定PV对象的相关属性。

1、构建PV和PVC的名称

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (p *nfsProvisioner) Provision(options controller.VolumeOptions) (*v1.PersistentVolume, error) {
if options.PVC.Spec.Selector != nil {
return nil, fmt.Errorf("claim Selector is not supported")
}
glog.V(4).Infof("nfs provisioner: VolumeOptions %v", options)

pvcNamespace := options.PVC.Namespace
pvcName := options.PVC.Name

pvName := strings.Join([]string{pvcNamespace, pvcName, options.PVName}, "-")

fullPath := filepath.Join(mountPath, pvName)
glog.V(4).Infof("creating path %s", fullPath)
if err := os.MkdirAll(fullPath, 0777); err != nil {
return nil, errors.New("unable to create directory to provision new pv: " + err.Error())
}
os.Chmod(fullPath, 0777)

path := filepath.Join(p.path, pvName)
...
}

通过VolumeOptions的入参,构建PV和PVC的名称,以及创建路径path。

2、构造PV对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: options.PVName,
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: options.PersistentVolumeReclaimPolicy,
AccessModes: options.PVC.Spec.AccessModes,
MountOptions: options.MountOptions,
Capacity: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)],
},
PersistentVolumeSource: v1.PersistentVolumeSource{
NFS: &v1.NFSVolumeSource{
Server: p.server,
Path: path,
ReadOnly: false,
},
},
},
}
return pv, nil

综上可以看出,Provision方法只是通过VolumeOptions参数来构建PV对象,并没有执行具体PV的创建或删除的操作。

不同类型的Provisioner的,一般是PersistentVolumeSource类型和参数不同,例如nfs-provisioner对应的PersistentVolumeSourceNFS,并且需要传入NFS相关的参数:ServerPath等。

2.2.2. Delete方法

nfsProvisionerdelete方法具体源码参考:https://github.com/kubernetes-incubator/external-storage/blob/master/nfs-client/cmd/nfs-client-provisioner/provisioner.go#L99

1、获取pvName和path等相关参数

1
2
3
4
5
6
7
8
9
10
func (p *nfsProvisioner) Delete(volume *v1.PersistentVolume) error {
path := volume.Spec.PersistentVolumeSource.NFS.Path
pvName := filepath.Base(path)
oldPath := filepath.Join(mountPath, pvName)
if _, err := os.Stat(oldPath); os.IsNotExist(err) {
glog.Warningf("path %s does not exist, deletion skipped", oldPath)
return nil
}
...
}

通过pathpvName生成oldPath,其中oldPath是原先NFS服务器上pod对应的数据持久化存储路径。

2、获取archiveOnDelete参数并删除数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Get the storage class for this volume.
storageClass, err := p.getClassForVolume(volume)
if err != nil {
return err
}
// Determine if the "archiveOnDelete" parameter exists.
// If it exists and has a falsey value, delete the directory.
// Otherwise, archive it.
archiveOnDelete, exists := storageClass.Parameters["archiveOnDelete"]
if exists {
archiveBool, err := strconv.ParseBool(archiveOnDelete)
if err != nil {
return err
}
if !archiveBool {
return os.RemoveAll(oldPath)
}
}

如果storageClass对象中指定archiveOnDelete参数并且值为false,则会自动删除oldPath下的所有数据,即pod对应的数据持久化存储数据。

archiveOnDelete字面意思为删除时是否存档,false表示不存档,即删除数据,true表示存档,即重命名路径。

3、重命名旧数据路径

1
2
3
archivePath := filepath.Join(mountPath, "archived-"+pvName)
glog.V(4).Infof("archiving path %s to %s", oldPath, archivePath)
return os.Rename(oldPath, archivePath)

如果storageClass对象中没有指定archiveOnDelete参数或者值为true,表明需要删除时存档,即将oldPath重命名,命名格式为oldPath前面增加archived-的前缀。

3. ProvisionController

3.1. ProvisionController结构体

源码具体参考:https://github.com/kubernetes-incubator/external-storage/blob/master/lib/controller/controller.go#L82

ProvisionController是一个给PVC提供PV的控制器,具体执行Provisioner接口的ProvisionDelete的方法的所有逻辑。

3.1.1. 入参

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// ProvisionController is a controller that provisions PersistentVolumes for
// PersistentVolumeClaims.
type ProvisionController struct {
client kubernetes.Interface

// The name of the provisioner for which this controller dynamically
// provisions volumes. The value of annDynamicallyProvisioned and
// annStorageProvisioner to set & watch for, respectively
provisionerName string

// The provisioner the controller will use to provision and delete volumes.
// Presumably this implementer of Provisioner carries its own
// volume-specific options and such that it needs in order to provision
// volumes.
provisioner Provisioner

// Kubernetes cluster server version:
// * 1.4: storage classes introduced as beta. Technically out-of-tree dynamic
// provisioning is not officially supported, though it works
// * 1.5: storage classes stay in beta. Out-of-tree dynamic provisioning is
// officially supported
// * 1.6: storage classes enter GA
kubeVersion *utilversion.Version
...
}

clientprovisionerNameprovisionerkubeVersion等属性作为NewProvisionController的入参。

  • client:clientset客户端,用来调用k8s的API。
  • provisionerName:provisioner的名字,需要和StorageClass对象中的provisioner字段一致。
  • provisioner:具体的provisioner的实现者,本文为nfsProvisioner
  • kubeVersion:k8s的版本信息。

3.1.2. Controller和Informer

1
2
3
4
5
6
7
8
9
10
11
12
13
type ProvisionController struct {
...
claimInformer cache.SharedInformer
claims cache.Store
claimController cache.Controller
volumeInformer cache.SharedInformer
volumes cache.Store
volumeController cache.Controller
classInformer cache.SharedInformer
classes cache.Store
classController cache.Controller
...
}

ProvisionController结构体中包含了PVPVCStorageClass三个对象的ControllerInformerStore,主要用来执行这三个对象的相关操作。

  • Controller:通用的控制框架
  • Informer:消息通知器
  • Store:通用的对象存储接口

3.1.3. workqueue

1
2
3
4
5
6
type ProvisionController struct {
...
claimQueue workqueue.RateLimitingInterface
volumeQueue workqueue.RateLimitingInterface
...
}

claimQueuevolumeQueue分别是PVPVC的任务队列。

3.1.4. 其他

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Identity of this controller, generated at creation time and not persisted
// across restarts. Useful only for debugging, for seeing the source of
// events. controller.provisioner may have its own, different notion of
// identity which may/may not persist across restarts
id string
component string
eventRecorder record.EventRecorder

resyncPeriod time.Duration

exponentialBackOffOnError bool
threadiness int

createProvisionedPVRetryCount int
createProvisionedPVInterval time.Duration

failedProvisionThreshold, failedDeleteThreshold int

// The port for metrics server to serve on.
metricsPort int32
// The IP address for metrics server to serve on.
metricsAddress string
// The path of metrics endpoint path.
metricsPath string

// Parameters of leaderelection.LeaderElectionConfig.
leaseDuration, renewDeadline, retryPeriod time.Duration

hasRun bool
hasRunLock *sync.Mutex

3.2. NewProvisionController方法

源码地址:https://github.com/kubernetes-incubator/external-storage/blob/master/lib/controller/controller.go#L418

NewProvisionController方法主要用来构造ProvisionController

3.2.1. 初始化默认值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// NewProvisionController creates a new provision controller using
// the given configuration parameters and with private (non-shared) informers.
func NewProvisionController(
client kubernetes.Interface,
provisionerName string,
provisioner Provisioner,
kubeVersion string,
options ...func(*ProvisionController) error,
) *ProvisionController {
...
controller := &ProvisionController{
client: client,
provisionerName: provisionerName,
provisioner: provisioner,
kubeVersion: utilversion.MustParseSemantic(kubeVersion),
id: id,
component: component,
eventRecorder: eventRecorder,
resyncPeriod: DefaultResyncPeriod,
exponentialBackOffOnError: DefaultExponentialBackOffOnError,
threadiness: DefaultThreadiness,
createProvisionedPVRetryCount: DefaultCreateProvisionedPVRetryCount,
createProvisionedPVInterval: DefaultCreateProvisionedPVInterval,
failedProvisionThreshold: DefaultFailedProvisionThreshold,
failedDeleteThreshold: DefaultFailedDeleteThreshold,
leaseDuration: DefaultLeaseDuration,
renewDeadline: DefaultRenewDeadline,
retryPeriod: DefaultRetryPeriod,
metricsPort: DefaultMetricsPort,
metricsAddress: DefaultMetricsAddress,
metricsPath: DefaultMetricsPath,
hasRun: false,
hasRunLock: &sync.Mutex{},
}
...
}

3.2.2. 初始化任务队列

1
2
3
4
5
6
7
8
9
10
11
12
ratelimiter := workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 1000*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
if !controller.exponentialBackOffOnError {
ratelimiter = workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 15*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
controller.claimQueue = workqueue.NewNamedRateLimitingQueue(ratelimiter, "claims")
controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(ratelimiter, "volumes")

3.2.3. ListWatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// PVC
claimSource := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().PersistentVolumeClaims(v1.NamespaceAll).Watch(options)
},
}
// PV
volumeSource := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().PersistentVolumes().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().PersistentVolumes().Watch(options)
},
}
// StorageClass
classSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.StorageV1().StorageClasses().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.StorageV1().StorageClasses().Watch(options)
},
}

list-watch机制是k8s中用来监听对象变化的核心机制,ListWatch包含ListFuncWatchFunc两个函数,且不能为空,以上代码分别构造了PV、PVC、StorageClass三个对象的ListWatch结构体。该机制的实现在client-gocache包中,具体参考:https://godoc.org/k8s.io/client-go/tools/cache。

更多ListWatch代码如下:

具体参考:https://github.com/kubernetes-incubator/external-storage/blob/89b0aaf6413b249b37834b124fc314ef7b8ee949/vendor/k8s.io/client-go/tools/cache/listwatch.go#L34

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options metav1.ListOptions) (runtime.Object, error)
// Watch should begin a watch at the specified version.
Watch(options metav1.ListOptions) (watch.Interface, error)
}

// ListFunc knows how to list resources
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)

// WatchFunc knows how to watch resources
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)

// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
// It is a convenience function for users of NewReflector, etc.
// ListFunc and WatchFunc must not be nil
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
// DisableChunking requests no chunking for this list watcher.
DisableChunking bool
}

3.2.4. ResourceEventHandlerFuncs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// PVC
claimHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.claimQueue, obj) },
}
// PV
volumeHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.volumeQueue, obj) },
}
// StorageClass
classHandler := cache.ResourceEventHandlerFuncs{
// We don't need an actual event handler for StorageClasses,
// but we must pass a non-nil one to cache.NewInformer()
AddFunc: nil,
UpdateFunc: nil,
DeleteFunc: nil,
}

ResourceEventHandlerFuncs是资源事件处理函数,主要用来对k8s资源对象增删改变化的事件进行消息通知,该函数实现了ResourceEventHandler的接口。具体代码逻辑在client-go的cache包中。

更多ResourceEventHandlerFuncs代码可参考:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// ResourceEventHandler can handle notifications for events that happen to a
// resource. The events are informational only, so you can't return an
// error.
// * OnAdd is called when an object is added.
// * OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
// * OnDelete will get the final state of the item if it is known, otherwise
// it will get an object of type DeletedFinalStateUnknown. This can
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}

// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}

3.2.5. 构造Store和Controller

1、PVC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if controller.claimInformer != nil {
controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod)
controller.claims, controller.claimController =
controller.claimInformer.GetStore(),
controller.claimInformer.GetController()
} else {
controller.claims, controller.claimController =
cache.NewInformer(
claimSource,
&v1.PersistentVolumeClaim{},
controller.resyncPeriod,
claimHandler,
)
}

2、PV

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if controller.volumeInformer != nil {
controller.volumeInformer.AddEventHandlerWithResyncPeriod(volumeHandler, controller.resyncPeriod)
controller.volumes, controller.volumeController =
controller.volumeInformer.GetStore(),
controller.volumeInformer.GetController()
} else {
controller.volumes, controller.volumeController =
cache.NewInformer(
volumeSource,
&v1.PersistentVolume{},
controller.resyncPeriod,
volumeHandler,
)
}

3、StorageClass

1
2
3
4
5
6
7
8
9
10
11
12
13
if controller.classInformer != nil {
// no resource event handler needed for StorageClasses
controller.classes, controller.classController =
controller.classInformer.GetStore(),
controller.classInformer.GetController()
} else {
controller.classes, controller.classController = cache.NewInformer(
classSource,
versionedClassType,
controller.resyncPeriod,
classHandler,
)
}

通过cache.NewInformer的方法构造,入参是ListWatch结构体和ResourceEventHandlerFuncs函数等,返回值是StoreController

通过以上各个部分的构造,最后返回一个具体的ProvisionController对象。

3.3. ProvisionController.Run方法

ProvisionControllerRun方法是以常驻进程的方式运行,函数内部再运行其他的controller。

3.3.1. prometheus数据收集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Run starts all of this controller's control loops
func (ctrl *ProvisionController) Run(stopCh <-chan struct{}) {

run := func(stopCh <-chan struct{}) {
...
if ctrl.metricsPort > 0 {
prometheus.MustRegister([]prometheus.Collector{
metrics.PersistentVolumeClaimProvisionTotal,
metrics.PersistentVolumeClaimProvisionFailedTotal,
metrics.PersistentVolumeClaimProvisionDurationSeconds,
metrics.PersistentVolumeDeleteTotal,
metrics.PersistentVolumeDeleteFailedTotal,
metrics.PersistentVolumeDeleteDurationSeconds,
}...)
http.Handle(ctrl.metricsPath, promhttp.Handler())
address := net.JoinHostPort(ctrl.metricsAddress, strconv.FormatInt(int64(ctrl.metricsPort), 10))
glog.Infof("Starting metrics server at %s\n", address)
go wait.Forever(func() {
err := http.ListenAndServe(address, nil)
if err != nil {
glog.Errorf("Failed to listen on %s: %v", address, err)
}
}, 5*time.Second)
}
...
}

3.3.2. Controller.Run

1
2
3
4
5
6
7
8
9
10
11
// If a SharedInformer has been passed in, this controller should not
// call Run again
if ctrl.claimInformer == nil {
go ctrl.claimController.Run(stopCh)
}
if ctrl.volumeInformer == nil {
go ctrl.volumeController.Run(stopCh)
}
if ctrl.classInformer == nil {
go ctrl.classController.Run(stopCh)
}

运行消息通知器Informer。

3.3.3. Worker

1
2
3
4
for i := 0; i < ctrl.threadiness; i++ {
go wait.Until(ctrl.runClaimWorker, time.Second, stopCh)
go wait.Until(ctrl.runVolumeWorker, time.Second, stopCh)
}

runClaimWorkerrunVolumeWorker分别为PVC和PV的worker,这两个的具体执行体分别是processNextClaimWorkItemprocessNextVolumeWorkItem

执行流程如下:

PVC的函数调用流程

1
runClaimWorker→processNextClaimWorkItem→syncClaimHandler→syncClaim→provisionClaimOperation

PV的函数调用流程

1
runVolumeWorker→processNextVolumeWorkItem→syncVolumeHandler→syncVolume→deleteVolumeOperation

可见最后执行的函数分别是provisionClaimOperationdeleteVolumeOperation

3.4. Operation

3.4.1. provisionClaimOperation

1、provisionClaimOperation入参是PVC,通过PVC获得PV对象,并判断PV对象是否存在,如果存在则退出后续操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// provisionClaimOperation attempts to provision a volume for the given claim.
// Returns error, which indicates whether provisioning should be retried
// (requeue the claim) or not
func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVolumeClaim) error {
// Most code here is identical to that found in controller.go of kube's PV controller...
claimClass := helper.GetPersistentVolumeClaimClass(claim)
operation := fmt.Sprintf("provision %q class %q", claimToClaimKey(claim), claimClass)
glog.Infof(logOperation(operation, "started"))

// A previous doProvisionClaim may just have finished while we were waiting for
// the locks. Check that PV (with deterministic name) hasn't been provisioned
// yet.
pvName := ctrl.getProvisionedVolumeNameForClaim(claim)
volume, err := ctrl.client.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{})
if err == nil && volume != nil {
// Volume has been already provisioned, nothing to do.
glog.Infof(logOperation(operation, "persistentvolume %q already exists, skipping", pvName))
return nil
}
...
}

2、获取StorageClass对象中的ProvisionerReclaimPolicy参数,如果provisionerNameStorageClass对象中的provisioner字段不一致则报错并退出执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
provisioner, parameters, err := ctrl.getStorageClassFields(claimClass)
if err != nil {
glog.Errorf(logOperation(operation, "error getting claim's StorageClass's fields: %v", err))
return nil
}
if provisioner != ctrl.provisionerName {
// class.Provisioner has either changed since shouldProvision() or
// annDynamicallyProvisioned contains different provisioner than
// class.Provisioner.
glog.Errorf(logOperation(operation, "unknown provisioner %q requested in claim's StorageClass", provisioner))
return nil
}
// Check if this provisioner can provision this claim.
if err = ctrl.canProvision(claim); err != nil {
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())
glog.Errorf(logOperation(operation, "failed to provision volume: %v", err))
return nil
}

reclaimPolicy := v1.PersistentVolumeReclaimDelete
if ctrl.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.8.0")) {
reclaimPolicy, err = ctrl.fetchReclaimPolicy(claimClass)
if err != nil {
return err
}
}

3、执行具体的provisioner.Provision方法,构建PV对象,例如本文中的provisionernfs-provisioner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
options := VolumeOptions{
PersistentVolumeReclaimPolicy: reclaimPolicy,
PVName: pvName,
PVC: claim,
MountOptions: mountOptions,
Parameters: parameters,
SelectedNode: selectedNode,
AllowedTopologies: allowedTopologies,
}

ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("External provisioner is provisioning volume for claim %q", claimToClaimKey(claim)))

volume, err = ctrl.provisioner.Provision(options)
if err != nil {
if ierr, ok := err.(*IgnoredError); ok {
// Provision ignored, do nothing and hope another provisioner will provision it.
glog.Infof(logOperation(operation, "volume provision ignored: %v", ierr))
return nil
}
err = fmt.Errorf("failed to provision volume with StorageClass %q: %v", claimClass, err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())
return err
}

4、创建k8s的PV对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Try to create the PV object several times
for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
glog.Infof(logOperation(operation, "trying to save persistentvvolume %q", volume.Name))
if _, err = ctrl.client.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) {
// Save succeeded.
if err != nil {
glog.Infof(logOperation(operation, "persistentvolume %q already exists, reusing", volume.Name))
err = nil
} else {
glog.Infof(logOperation(operation, "persistentvolume %q saved", volume.Name))
}
break
}
// Save failed, try again after a while.
glog.Infof(logOperation(operation, "failed to save persistentvolume %q: %v", volume.Name, err))
time.Sleep(ctrl.createProvisionedPVInterval)
}

5、创建PV失败,清理存储资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if err != nil {
// Save failed. Now we have a storage asset outside of Kubernetes,
// but we don't have appropriate PV object for it.
// Emit some event here and try to delete the storage asset several
// times.
...
for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
if err = ctrl.provisioner.Delete(volume); err == nil {
// Delete succeeded
glog.Infof(logOperation(operation, "cleaning volume %q succeeded", volume.Name))
break
}
// Delete failed, try again after a while.
glog.Infof(logOperation(operation, "failed to clean volume %q: %v", volume.Name, err))
time.Sleep(ctrl.createProvisionedPVInterval)
}
if err != nil {
// Delete failed several times. There is an orphaned volume and there
// is nothing we can do about it.
strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), err)
glog.Error(logOperation(operation, strerr))
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr)
}
}

如果创建成功,则打印成功的日志,并返回nil

3.4.2. deleteVolumeOperation

1、deleteVolumeOperation入参是PV,先获得PV对象,并判断是否需要删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// deleteVolumeOperation attempts to delete the volume backing the given
// volume. Returns error, which indicates whether deletion should be retried
// (requeue the volume) or not
func (ctrl *ProvisionController) deleteVolumeOperation(volume *v1.PersistentVolume) error {
...
// This method may have been waiting for a volume lock for some time.
// Our check does not have to be as sophisticated as PV controller's, we can
// trust that the PV controller has set the PV to Released/Failed and it's
// ours to delete
newVolume, err := ctrl.client.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{})
if err != nil {
return nil
}
if !ctrl.shouldDelete(newVolume) {
glog.Infof(logOperation(operation, "persistentvolume no longer needs deletion, skipping"))
return nil
}
...
}

2、调用具体的provisionerDelete方法,例如,如果是nfs-provisioner,则是调用nfs-provisioner的Delete方法。

1
2
3
4
5
6
7
8
9
10
11
12
err = ctrl.provisioner.Delete(volume)
if err != nil {
if ierr, ok := err.(*IgnoredError); ok {
// Delete ignored, do nothing and hope another provisioner will delete it.
glog.Infof(logOperation(operation, "volume deletion ignored: %v", ierr))
return nil
}
// Delete failed, emit an event.
glog.Errorf(logOperation(operation, "volume deletion failed: %v", err))
ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, "VolumeFailedDelete", err.Error())
return err
}

3、删除k8s中的PV对象。

1
2
3
4
5
6
7
// Delete the volume
if err = ctrl.client.CoreV1().PersistentVolumes().Delete(volume.Name, nil); err != nil {
// Oops, could not delete the volume and therefore the controller will
// try to delete the volume again on next update.
glog.Infof(logOperation(operation, "failed to delete persistentvolume: %v", err))
return err
}

4. 总结

  1. Provisioner接口包含ProvisionDelete两个方法,自定义的provisioner需要实现这两个方法,这两个方法只是处理了跟存储类型相关的事项,并没有针对PVPVC对象的增删等操作。
  2. Provision方法主要用来构造PV对象,不同类型的Provisioner的,一般是PersistentVolumeSource类型和参数不同,例如nfs-provisioner对应的PersistentVolumeSourceNFS,并且需要传入NFS相关的参数:ServerPath等。
  3. Delete方法主要针对对应的存储类型,做数据存档(备份)或删除的处理。
  4. StorageClass对象需要单独创建,用来指定具体的provisioner来执行相关逻辑。
  5. provisionClaimOperationdeleteVolumeOperation具体执行了k8s中PV对象的创建和删除操作,同时调用了具体provisionerProvisionDelete两个方法来对存储数据做处理。

参考文章

推荐文章