本文主要分析client-go中使用的workqueue,从而来分析k8s是如何基于任务队列做并发控制的。其中代码参考:
1. 概述
k8s的控制器大多是基于任务队列的方式进行并发控制,甚至包括基于controller-manager开发的自定义operator控制器。以下我们以deployment controller为例展示workqueue在k8s中的使用。
2. Deployment中的workqueue
2.1. 初始化workqueue
Deployment 的构建函数NewDeploymentController中初始化了任务队列和对于event事件处理相对应的workqueue的操作。
- 初始化workqueue的限速任务队列。
- 添加event handler使得deployment对象入队,等待处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func NewDeploymentController(ctx context.Context, dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) { dc := &DeploymentController{ queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"), } dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { dc.addDeployment(logger, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { dc.updateDeployment(logger, oldObj, newObj) }, DeleteFunc: func(obj interface{}) { dc.deleteDeployment(logger, obj) }, }) dc.enqueueDeployment = dc.enqueue
|
以下显示event handler具体逻辑,可见,无论是add,update,delete event handler都是调用enqueueDeployment的函数,而enqueueDeployment实际上就是初始化函数中的dc.enqueue。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| func (dc *DeploymentController) addDeployment(logger klog.Logger, obj interface{}) { d := obj.(*apps.Deployment) logger.V(4).Info("Adding deployment", "deployment", klog.KObj(d)) dc.enqueueDeployment(d) }
func (dc *DeploymentController) updateDeployment(logger klog.Logger, old, cur interface{}) { oldD := old.(*apps.Deployment) curD := cur.(*apps.Deployment) logger.V(4).Info("Updating deployment", "deployment", klog.KObj(oldD)) dc.enqueueDeployment(curD) }
func (dc *DeploymentController) deleteDeployment(logger klog.Logger, obj interface{}) { d, ok := obj.(*apps.Deployment) logger.V(4).Info("Deleting deployment", "deployment", klog.KObj(d)) dc.enqueueDeployment(d) }
|
2.2. 添加任务到队列
以下是dc.enqueue的具体实现,可见最终的调用workqueue的Add方法,添加对象到任务队列中。
1 2 3 4 5
| func (dc *DeploymentController) enqueue(deployment *apps.Deployment) { key, err := controller.KeyFunc(deployment) dc.queue.Add(key) }
|
其中key是namespace/name
的拼接字符串,通过MetaNamespaceKeyFunc函数获取对象的key。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| func MetaNamespaceKeyFunc(obj interface{}) (string, error) { if key, ok := obj.(ExplicitKey); ok { return string(key), nil } objName, err := ObjectToName(obj) if err != nil { return "", err } return objName.String(), nil }
func (objName ObjectName) String() string { if len(objName.Namespace) > 0 { return objName.Namespace + "/" + objName.Name } return objName.Name }
|
2.3. 读取任务队列
deployment controller会调用dc.queue.Get()
来读取任务队列中的对象。该goroutine是一个常驻的逻辑实时获取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool { key, quit := dc.queue.Get() if quit { return false } defer dc.queue.Done(key) err := dc.syncHandler(ctx, key.(string)) dc.handleErr(ctx, err, key)
return true }
|
任务的错误处理:
- 如果错误为空,则调用Forget移出队列
- 如果小于最大重试次数,则加入延迟队列
- 如果大于最大重试次数,则移出队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| func (dc *DeploymentController) handleErr(ctx context.Context, err error, key interface{}) { if err == nil || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { dc.queue.Forget(key) return } ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string)) if dc.queue.NumRequeues(key) < maxRetries { dc.queue.AddRateLimited(key) return } utilruntime.HandleError(err) logger.V(2).Info("Dropping deployment out of the queue", "deployment", klog.KRef(ns, name), "err", err) dc.queue.Forget(key) }
|
syncDeployment获取deployment对象,基于ns和name重新获取deployment。
1 2 3 4 5 6 7 8 9 10 11 12
| func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) deployment, err := dc.dLister.Deployments(namespace).Get(name) if errors.IsNotFound(err) { logger.V(2).Info("Deployment has been deleted", "deployment", klog.KRef(namespace, name)) return nil }
d := deployment.DeepCopy()
|
待完善