// Provisioner is an interface that creates templates for PersistentVolumes // and can create the volume as a new resource in the infrastructure provider. // It can also remove the volume it created from the underlying storage // provider. type Provisioner interface { // Provision creates a volume i.e. the storage asset and returns a PV object // for the volume Provision(VolumeOptions) (*v1.PersistentVolume, error) // Delete removes the storage asset that was created by Provision backing the // given PV. Does not delete the PV object itself. // // May return IgnoredError to indicate that the call has been ignored and no // action taken. Delete(*v1.PersistentVolume) error }
// VolumeOptions contains option information about a volume // https://github.com/kubernetes/kubernetes/blob/release-1.4/pkg/volume/plugins.go type VolumeOptions struct { // Reclamation policy for a persistent volume PersistentVolumeReclaimPolicy v1.PersistentVolumeReclaimPolicy // PV.Name of the appropriate PersistentVolume. Used to generate cloud // volume name. PVName string
// PV mount options. Not validated - mount of the PVs will simply fail if one is invalid. MountOptions []string
// PVC is reference to the claim that lead to provisioning of a new PV. // Provisioners *must* create a PV that would be matched by this PVC, // i.e. with required capacity, accessMode, labels matching PVC.Selector and // so on. PVC *v1.PersistentVolumeClaim // Volume provisioning parameters from StorageClass Parameters map[string]string
// Node selected by the scheduler for the volume. SelectedNode *v1.Node // Topology constraint parameter from StorageClass AllowedTopologies []v1.TopologySelectorTerm }
server := os.Getenv("NFS_SERVER") if server == "" { glog.Fatal("NFS_SERVER not set") } path := os.Getenv("NFS_PATH") if path == "" { glog.Fatal("NFS_PATH not set") } provisionerName := os.Getenv(provisionerNameKey) if provisionerName == "" { glog.Fatalf("environment variable %s is not set! Please set it.", provisionerNameKey) } ... }
apiVersion:storage.k8s.io/v1 kind:StorageClass metadata: name:managed-nfs-storage provisioner:fuseim.pri/ifs# or choose another name, must match deployment's env PROVISIONER_NAME' parameters: archiveOnDelete:"false"# When set to "false" your PVs will not be archived by the provisioner upon deletion of the PVC.
2.1.2. 获取clientset对象
源码如下:
1 2 3 4 5 6 7 8 9 10
// Create an InClusterConfig and use it to create a client for the controller // to use to communicate with Kubernetes config, err := rest.InClusterConfig() if err != nil { glog.Fatalf("Failed to create config: %v", err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { glog.Fatalf("Failed to create client: %v", err) }
// The controller needs to know what the server version is because out-of-tree // provisioners aren't officially supported until 1.5 serverVersion, err := clientset.Discovery().ServerVersion() if err != nil { glog.Fatalf("Error getting server version: %v", err) }
// Start the provision controller which will dynamically provision efs NFS // PVs pc := controller.NewProvisionController(clientset, provisionerName, clientNFSProvisioner, serverVersion.GitVersion) pc.Run(wait.NeverStop)
// Get the storage class for this volume. storageClass, err := p.getClassForVolume(volume) if err != nil { return err } // Determine if the "archiveOnDelete" parameter exists. // If it exists and has a falsey value, delete the directory. // Otherwise, archive it. archiveOnDelete, exists := storageClass.Parameters["archiveOnDelete"] if exists { archiveBool, err := strconv.ParseBool(archiveOnDelete) if err != nil { return err } if !archiveBool { return os.RemoveAll(oldPath) } }
// ProvisionController is a controller that provisions PersistentVolumes for // PersistentVolumeClaims. type ProvisionController struct { client kubernetes.Interface
// The name of the provisioner for which this controller dynamically // provisions volumes. The value of annDynamicallyProvisioned and // annStorageProvisioner to set & watch for, respectively provisionerName string
// The provisioner the controller will use to provision and delete volumes. // Presumably this implementer of Provisioner carries its own // volume-specific options and such that it needs in order to provision // volumes. provisioner Provisioner
// Kubernetes cluster server version: // * 1.4: storage classes introduced as beta. Technically out-of-tree dynamic // provisioning is not officially supported, though it works // * 1.5: storage classes stay in beta. Out-of-tree dynamic provisioning is // officially supported // * 1.6: storage classes enter GA kubeVersion *utilversion.Version ... }
// Identity of this controller, generated at creation time and not persisted // across restarts. Useful only for debugging, for seeing the source of // events. controller.provisioner may have its own, different notion of // identity which may/may not persist across restarts id string component string eventRecorder record.EventRecorder
resyncPeriod time.Duration
exponentialBackOffOnError bool threadiness int
createProvisionedPVRetryCount int createProvisionedPVInterval time.Duration
failedProvisionThreshold, failedDeleteThreshold int
// The port for metrics server to serve on. metricsPort int32 // The IP address for metrics server to serve on. metricsAddress string // The path of metrics endpoint path. metricsPath string
// Parameters of leaderelection.LeaderElectionConfig. leaseDuration, renewDeadline, retryPeriod time.Duration
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. type ListerWatcher interface { // List should return a list type object; the Items field will be extracted, and the // ResourceVersion field will be used to start the watch in the right place. List(options metav1.ListOptions) (runtime.Object, error) // Watch should begin a watch at the specified version. Watch(options metav1.ListOptions) (watch.Interface, error) }
// ListFunc knows how to list resources type ListFunc func(options metav1.ListOptions)(runtime.Object, error)
// WatchFunc knows how to watch resources type WatchFunc func(options metav1.ListOptions)(watch.Interface, error)
// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface. // It is a convenience function for users of NewReflector, etc. // ListFunc and WatchFunc must not be nil type ListWatch struct { ListFunc ListFunc WatchFunc WatchFunc // DisableChunking requests no chunking for this list watcher. DisableChunking bool }
// ResourceEventHandler can handle notifications for events that happen to a // resource. The events are informational only, so you can't return an // error. // * OnAdd is called when an object is added. // * OnUpdate is called when an object is modified. Note that oldObj is the // last known state of the object-- it is possible that several changes // were combined together, so you can't use this to see every single // change. OnUpdate is also called when a re-list happens, and it will // get called even if nothing changed. This is useful for periodically // evaluating or syncing something. // * OnDelete will get the final state of the item if it is known, otherwise // it will get an object of type DeletedFinalStateUnknown. This can // happen if the watch is closed and misses the delete event and we don't // notice the deletion until the subsequent re-list. type ResourceEventHandler interface { OnAdd(obj interface{}) OnUpdate(oldObj, newObj interface{}) OnDelete(obj interface{}) }
// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or // as few of the notification functions as you want while still implementing // ResourceEventHandler. type ResourceEventHandlerFuncs struct { AddFunc func(obj interface{}) UpdateFunc func(oldObj, newObj interface{}) DeleteFunc func(obj interface{}) }
// Run starts all of this controller's control loops func(ctrl *ProvisionController)Run(stopCh <-chanstruct{}) {
run := func(stopCh <-chanstruct{}) { ... if ctrl.metricsPort > 0 { prometheus.MustRegister([]prometheus.Collector{ metrics.PersistentVolumeClaimProvisionTotal, metrics.PersistentVolumeClaimProvisionFailedTotal, metrics.PersistentVolumeClaimProvisionDurationSeconds, metrics.PersistentVolumeDeleteTotal, metrics.PersistentVolumeDeleteFailedTotal, metrics.PersistentVolumeDeleteDurationSeconds, }...) http.Handle(ctrl.metricsPath, promhttp.Handler()) address := net.JoinHostPort(ctrl.metricsAddress, strconv.FormatInt(int64(ctrl.metricsPort), 10)) glog.Infof("Starting metrics server at %s\n", address) go wait.Forever(func() { err := http.ListenAndServe(address, nil) if err != nil { glog.Errorf("Failed to listen on %s: %v", address, err) } }, 5*time.Second) } ... }
3.3.2. Controller.Run
1 2 3 4 5 6 7 8 9 10 11
// If a SharedInformer has been passed in, this controller should not // call Run again if ctrl.claimInformer == nil { go ctrl.claimController.Run(stopCh) } if ctrl.volumeInformer == nil { go ctrl.volumeController.Run(stopCh) } if ctrl.classInformer == nil { go ctrl.classController.Run(stopCh) }
运行消息通知器Informer。
3.3.3. Worker
1 2 3 4
for i := 0; i < ctrl.threadiness; i++ { go wait.Until(ctrl.runClaimWorker, time.Second, stopCh) go wait.Until(ctrl.runVolumeWorker, time.Second, stopCh) }
// provisionClaimOperation attempts to provision a volume for the given claim. // Returns error, which indicates whether provisioning should be retried // (requeue the claim) or not func(ctrl *ProvisionController)provisionClaimOperation(claim *v1.PersistentVolumeClaim)error { // Most code here is identical to that found in controller.go of kube's PV controller... claimClass := helper.GetPersistentVolumeClaimClass(claim) operation := fmt.Sprintf("provision %q class %q", claimToClaimKey(claim), claimClass) glog.Infof(logOperation(operation, "started"))
// A previous doProvisionClaim may just have finished while we were waiting for // the locks. Check that PV (with deterministic name) hasn't been provisioned // yet. pvName := ctrl.getProvisionedVolumeNameForClaim(claim) volume, err := ctrl.client.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{}) if err == nil && volume != nil { // Volume has been already provisioned, nothing to do. glog.Infof(logOperation(operation, "persistentvolume %q already exists, skipping", pvName)) returnnil } ... }
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("External provisioner is provisioning volume for claim %q", claimToClaimKey(claim)))
volume, err = ctrl.provisioner.Provision(options) if err != nil { if ierr, ok := err.(*IgnoredError); ok { // Provision ignored, do nothing and hope another provisioner will provision it. glog.Infof(logOperation(operation, "volume provision ignored: %v", ierr)) returnnil } err = fmt.Errorf("failed to provision volume with StorageClass %q: %v", claimClass, err) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error()) return err }
4、创建k8s的PV对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Try to create the PV object several times for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ { glog.Infof(logOperation(operation, "trying to save persistentvvolume %q", volume.Name)) if _, err = ctrl.client.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) { // Save succeeded. if err != nil { glog.Infof(logOperation(operation, "persistentvolume %q already exists, reusing", volume.Name)) err = nil } else { glog.Infof(logOperation(operation, "persistentvolume %q saved", volume.Name)) } break } // Save failed, try again after a while. glog.Infof(logOperation(operation, "failed to save persistentvolume %q: %v", volume.Name, err)) time.Sleep(ctrl.createProvisionedPVInterval) }
if err != nil { // Save failed. Now we have a storage asset outside of Kubernetes, // but we don't have appropriate PV object for it. // Emit some event here and try to delete the storage asset several // times. ... for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ { if err = ctrl.provisioner.Delete(volume); err == nil { // Delete succeeded glog.Infof(logOperation(operation, "cleaning volume %q succeeded", volume.Name)) break } // Delete failed, try again after a while. glog.Infof(logOperation(operation, "failed to clean volume %q: %v", volume.Name, err)) time.Sleep(ctrl.createProvisionedPVInterval) } if err != nil { // Delete failed several times. There is an orphaned volume and there // is nothing we can do about it. strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), err) glog.Error(logOperation(operation, strerr)) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr) } }
// deleteVolumeOperation attempts to delete the volume backing the given // volume. Returns error, which indicates whether deletion should be retried // (requeue the volume) or not func(ctrl *ProvisionController)deleteVolumeOperation(volume *v1.PersistentVolume)error { ... // This method may have been waiting for a volume lock for some time. // Our check does not have to be as sophisticated as PV controller's, we can // trust that the PV controller has set the PV to Released/Failed and it's // ours to delete newVolume, err := ctrl.client.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{}) if err != nil { returnnil } if !ctrl.shouldDelete(newVolume) { glog.Infof(logOperation(operation, "persistentvolume no longer needs deletion, skipping")) returnnil } ... }
err = ctrl.provisioner.Delete(volume) if err != nil { if ierr, ok := err.(*IgnoredError); ok { // Delete ignored, do nothing and hope another provisioner will delete it. glog.Infof(logOperation(operation, "volume deletion ignored: %v", ierr)) returnnil } // Delete failed, emit an event. glog.Errorf(logOperation(operation, "volume deletion failed: %v", err)) ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, "VolumeFailedDelete", err.Error()) return err }
3、删除k8s中的PV对象。
1 2 3 4 5 6 7
// Delete the volume if err = ctrl.client.CoreV1().PersistentVolumes().Delete(volume.Name, nil); err != nil { // Oops, could not delete the volume and therefore the controller will // try to delete the volume again on next update. glog.Infof(logOperation(operation, "failed to delete persistentvolume: %v", err)) return err }