// Run runs the KubeControllerManagerOptions. This should never exit. funcRun(c *config.CompletedConfig, stopCh <-chanstruct{})error { ... controllerContext.InformerFactory.Start(controllerContext.Stop) close(controllerContext.InformersStarted) ... }
// SharedInformerFactory a small interface to allow for adding an informer without an import cycle type SharedInformerFactory interface { Start(stopCh <-chanstruct{}) InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer }
for informerType, informer := range f.informers { if !f.startedInformers[informerType] { go informer.Run(stopCh) f.startedInformers[informerType] = true } } }
// Separate stop channel because Processor should be stopped strictly after controller processorStopCh := make(chanstruct{}) var wg wait.Group defer wg.Wait() // Wait for Processor to stop deferclose(processorStopCh) // Tell Processor to stop wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) wg.StartWithChannel(processorStopCh, s.processor.run)
deferfunc() { s.startedLock.Lock() defer s.startedLock.Unlock() s.stopped = true// Don't want any new listeners }() s.controller.Run(stopCh) }
func(d *defaultCacheMutationDetector)Run(stopCh <-chanstruct{}) { // we DON'T want protection from panics. If we're running this code, we want to die for { d.CompareObjects()
select { case <-stopCh: return case <-time.After(d.period): } } }
func(p *sharedProcessor)run(stopCh <-chanstruct{}) { func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { p.wg.Start(listener.run) p.wg.Start(listener.pop) } }() <-stopCh p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop } p.wg.Wait() // Wait for all .pop() and .run() to stop }
该部分逻辑待后面分析。
2.6. controller.Run
调用s.controller.Run,构建Reflector,进行对etcd的缓存
1 2 3 4 5 6
deferfunc() { s.startedLock.Lock() defer s.startedLock.Unlock() s.stopped = true// Don't want any new listeners }() s.controller.Run(stopCh)
// Run begins processing items, and will continue until a value is sent down stopCh. // It's an error to call Run more than once. // Run blocks; call via go. func(c *controller)Run(stopCh <-chanstruct{}) { defer utilruntime.HandleCrash() gofunc() { <-stopCh c.config.Queue.Close() }() r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) r.ShouldResync = c.config.ShouldResync r.clock = c.clock
c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock()
// Reflector watches a specified resource and causes all changes to be reflected in the given store. type Reflector struct { // name identifies this reflector. By default it will be a file:line if possible. name string // metrics tracks basic metric information about the reflector metrics *reflectorMetrics
// The type of object we expect to place in the store. expectedType reflect.Type // The destination to sync up with the watch source store Store // listerWatcher is used to perform lists and watches. listerWatcher ListerWatcher // period controls timing between one watch ending and // the beginning of the next one. period time.Duration resyncPeriod time.Duration ShouldResync func()bool // clock allows tests to manipulate time clock clock.Clock // lastSyncResourceVersion is the resource version token last // observed when doing a sync with the underlying store // it is thread safe, but not synchronized with the underlying store lastSyncResourceVersion string // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion lastSyncResourceVersionMutex sync.RWMutex }
// NewReflector creates a new Reflector object which will keep the given store up to // date with the server's contents for the given resource. Reflector promises to // only put things in the store that have the type of expectedType, unless expectedType // is nil. If resyncPeriod is non-zero, then lists will be executed after every // resyncPeriod, so that you can use reflectors to periodically process everything as // well as incrementally processing the things that change. funcNewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod) }
// reflectorDisambiguator is used to disambiguate started reflectors. // initialized to an unstable value to ensure meaning isn't attributed to the suffix. var reflectorDisambiguator = int64(time.Now().UnixNano() % 12345)
// NewNamedReflector same as NewReflector, but with a specified name for logging funcNewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1) r := &Reflector{ name: name, // we need this to be unique per process (some names are still the same)but obvious who it belongs to metrics: newReflectorMetrics(makeValidPromethusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))), listerWatcher: lw, store: store, expectedType: reflect.TypeOf(expectedType), period: time.Second, resyncPeriod: resyncPeriod, clock: &clock.RealClock{}, } return r }
3.3. Reflector.Run
Reflector.Run主要执行了ListAndWatch的方法。
1 2 3 4 5 6 7 8 9 10
// Run starts a watch and handles watch events. Will restart the watch if it is closed. // Run will exit when stopCh is closed. func(r *Reflector)Run(stopCh <-chanstruct{}) { glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name) wait.Until(func() { if err := r.ListAndWatch(stopCh); err != nil { utilruntime.HandleError(err) } }, r.period, stopCh) }
// ListAndWatch first lists all items and get the resource version at the moment of call, // and then use the resource version to watch. // It returns error if ListAndWatch didn't even try to initialize watch. func(r *Reflector)ListAndWatch(stopCh <-chanstruct{})error { glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name) var resourceVersion string
// Explicitly set "0" as resource version - it's fine for the List() // to be served from cache and potentially be delayed relative to // etcd contents. Reflector framework will catch up via Watch() eventually. options := metav1.ListOptions{ResourceVersion: "0"} r.metrics.numberOfLists.Inc() start := r.clock.Now() list, err := r.listerWatcher.List(options) if err != nil { return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err) } r.metrics.listDuration.Observe(time.Since(start).Seconds()) listMetaInterface, err := meta.ListAccessor(list) if err != nil { return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err) } resourceVersion = listMetaInterface.GetResourceVersion() items, err := meta.ExtractList(list) if err != nil { return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err) } r.metrics.numberOfItemsInList.Observe(float64(len(items))) if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) } r.setLastSyncResourceVersion(resourceVersion) ... }
// syncWith replaces the store's items with the given list. func(r *Reflector)syncWith(items []runtime.Object, resourceVersion string)error { found := make([]interface{}, 0, len(items)) for _, item := range items { found = append(found, item) } return r.store.Replace(found, resourceVersion) }
Store.Replace方法定义如下:
1 2 3 4 5 6 7 8
type Store interface { ... // Replace will delete the contents of the store, using instead the // given list. Store takes ownership of the list, you should not reference // it after calling this function. Replace([]interface{}, string) error ... }
最后设置最新的资源版本号。
1
r.setLastSyncResourceVersion(resourceVersion)
setLastSyncResourceVersion:
1 2 3 4 5 6 7 8 9 10
func(r *Reflector)setLastSyncResourceVersion(v string) { r.lastSyncResourceVersionMutex.Lock() defer r.lastSyncResourceVersionMutex.Unlock() r.lastSyncResourceVersion = v
for { // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors select { case <-stopCh: returnnil default: }
timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) options = metav1.ListOptions{ ResourceVersion: resourceVersion, // We want to avoid situations of hanging watchers. Stop any wachers that do not // receive any events within the timeout window. TimeoutSeconds: &timemoutseconds, }
r.metrics.numberOfWatches.Inc() w, err := r.listerWatcher.Watch(options) if err != nil { switch err { case io.EOF: // watch closed normally case io.ErrUnexpectedEOF: glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err) default: utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err)) } // If this is "connection refused" error, it means that most likely apiserver is not responsive. // It doesn't make sense to re-list all objects because most likely we will be able to restart // watch where we ended. // If that's the case wait and resend watch request. if urlError, ok := err.(*url.Error); ok { if opError, ok := urlError.Err.(*net.OpError); ok { if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED { time.Sleep(time.Second) continue } } } returnnil }
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { if err != errorStopRequested { glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err) } returnnil } }
设置watch的超时时间,默认为5分钟。
1 2 3 4 5 6 7
timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) options = metav1.ListOptions{ ResourceVersion: resourceVersion, // We want to avoid situations of hanging watchers. Stop any wachers that do not // receive any events within the timeout window. TimeoutSeconds: &timemoutseconds, }
// watchHandler watches w and keeps *resourceVersion up to date. func(r *Reflector)watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chanstruct{})error { start := r.clock.Now() eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way // we're coming back in with the same watch interface. defer w.Stop() // update metrics deferfunc() { r.metrics.numberOfItemsInWatch.Observe(float64(eventCount)) r.metrics.watchDuration.Observe(time.Since(start).Seconds()) }()
loop: for { select { case <-stopCh: return errorStopRequested case err := <-errc: return err case event, ok := <-w.ResultChan(): if !ok { break loop } if event.Type == watch.Error { return apierrs.FromObject(event.Object) } if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a { utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) continue } meta, err := meta.Accessor(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) continue } newResourceVersion := meta.GetResourceVersion() switch event.Type { case watch.Added: err := r.store.Add(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Modified: err := r.store.Update(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Deleted: // TODO: Will any consumers need access to the "last known // state", which is passed in event.Object? If so, may need // to change this. err := r.store.Delete(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) } default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) } *resourceVersion = newResourceVersion r.setLastSyncResourceVersion(newResourceVersion) eventCount++ } }
watchDuration := r.clock.Now().Sub(start) if watchDuration < 1*time.Second && eventCount == 0 { r.metrics.numberOfShortWatches.Inc() return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name) } glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount) returnnil }
获取watch接口中的事件的channel,来获取事件的内容。
1 2 3 4 5 6
for { select { ... case event, ok := <-w.ResultChan(): ... }
switch event.Type { case watch.Added: err := r.store.Add(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Modified: err := r.store.Update(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Deleted: // TODO: Will any consumers need access to the "last known // state", which is passed in event.Object? If so, may need // to change this. err := r.store.Delete(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) } default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) }
// NewDeltaFIFO returns a Store which can be used process changes to items. // // keyFunc is used to figure out what key an object should have. (It's // exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.) // // 'compressor' may compress as many or as few items as it wants // (including returning an empty slice), but it should do what it // does quickly since it is called while the queue is locked. // 'compressor' may be nil if you don't want any delta compression. // // 'keyLister' is expected to return a list of keys that the consumer of // this queue "knows about". It is used to decide which items are missing // when Replace() is called; 'Deleted' deltas are produced for these items. // It may be nil if you don't need to detect all deletions. // TODO: consider merging keyLister with this object, tracking a list of // "known" keys when Pop() is called. Have to think about how that // affects error retrying. // TODO(lavalamp): I believe there is a possible race only when using an // external known object source that the above TODO would // fix. // // Also see the comment on DeltaFIFO. funcNewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO { f := &DeltaFIFO{ items: map[string]Deltas{}, queue: []string{}, keyFunc: keyFunc, deltaCompressor: compressor, knownObjects: knownObjects, } f.cond.L = &f.lock return f }
// NewNamedReflector same as NewReflector, but with a specified name for logging funcNewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1) r := &Reflector{ name: name, // we need this to be unique per process (some names are still the same)but obvious who it belongs to metrics: newReflectorMetrics(makeValidPromethusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))), listerWatcher: lw, store: store, expectedType: reflect.TypeOf(expectedType), period: time.Second, resyncPeriod: resyncPeriod, clock: &clock.RealClock{}, } return r }
// DeltaFIFO is like FIFO, but allows you to process deletes. // // DeltaFIFO is a producer-consumer queue, where a Reflector is // intended to be the producer, and the consumer is whatever calls // the Pop() method. // // DeltaFIFO solves this use case: // * You want to process every object change (delta) at most once. // * When you process an object, you want to see everything // that's happened to it since you last processed it. // * You want to process the deletion of objects. // * You might want to periodically reprocess objects. // // DeltaFIFO's Pop(), Get(), and GetByKey() methods return // interface{} to satisfy the Store/Queue interfaces, but it // will always return an object of type Deltas. // // A note on threading: If you call Pop() in parallel from multiple // threads, you could end up with multiple threads processing slightly // different versions of the same object. // // A note on the KeyLister used by the DeltaFIFO: It's main purpose is // to list keys that are "known", for the purpose of figuring out which // items have been deleted when Replace() or Delete() are called. The deleted // object will be included in the DeleteFinalStateUnknown markers. These objects // could be stale. // // You may provide a function to compress deltas (e.g., represent a // series of Updates as a single Update). type DeltaFIFO struct { // lock/cond protects access to 'items' and 'queue'. lock sync.RWMutex cond sync.Cond
// We depend on the property that items in the set are in // the queue and vice versa, and that all Deltas in this // map have at least one Delta. items map[string]Deltas queue []string
// populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update was called first. populated bool // initialPopulationCount is the number of items inserted by the first call of Replace() initialPopulationCount int
// keyFunc is used to make the key used for queued item // insertion and retrieval, and should be deterministic. keyFunc KeyFunc
// deltaCompressor tells us how to combine two or more // deltas. It may be nil. deltaCompressor DeltaCompressor
// knownObjects list keys that are "known", for the // purpose of figuring out which items have been deleted // when Replace() or Delete() is called. knownObjects KeyListerGetter
// Indication the queue is closed. // Used to indicate a queue is closed so a control loop can exit when a queue is empty. // Currently, not used to gate any of CRED operations. closed bool closedLock sync.Mutex }
// Queue is exactly like a Store, but has a Pop() method too. type Queue interface { Store
// Pop blocks until it has something to process. // It returns the object that was process and the result of processing. // The PopProcessFunc may return an ErrRequeue{...} to indicate the item // should be requeued before releasing the lock on the queue. Pop(PopProcessFunc) (interface{}, error)
// AddIfNotPresent adds a value previously // returned by Pop back into the queue as long // as nothing else (presumably more recent) // has since been added. AddIfNotPresent(interface{}) error
// Return true if the first batch of items has been popped HasSynced() bool
// Store is a generic object storage interface. Reflector knows how to watch a server // and update a store. A generic store is provided, which allows Reflector to be used // as a local caching system, and an LRU store, which allows Reflector to work like a // queue of items yet to be processed. // // Store makes no assumptions about stored object identity; it is the responsibility // of a Store implementation to provide a mechanism to correctly key objects and to // define the contract for obtaining objects by some arbitrary key type. type Store interface { Add(obj interface{}) error Update(obj interface{}) error Delete(obj interface{}) error List() []interface{} ListKeys() []string Get(obj interface{}) (item interface{}, exists bool, err error) GetByKey(key string) (item interface{}, exists bool, err error)
// Replace will delete the contents of the store, using instead the // given list. Store takes ownership of the list, you should not reference // it after calling this function. Replace([]interface{}, string) error Resync() error }
// cache responsibilities are limited to: // 1. Computing keys for objects via keyFunc // 2. Invoking methods of a ThreadSafeStorage interface type cache struct { // cacheStorage bears the burden of thread safety for the cache cacheStorage ThreadSafeStore // keyFunc is used to make the key for objects stored in and retrieved from items, and // should be deterministic. keyFunc KeyFunc }
其中ListAndWatch主要用到以下的方法:
cache.Replace
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Replace will delete the contents of 'c', using instead the given list. // 'c' takes ownership of the list, you should not reference the list again // after calling this function. func(c *cache)Replace(list []interface{}, resourceVersion string)error { items := map[string]interface{}{} for _, item := range list { key, err := c.keyFunc(item) if err != nil { return KeyError{item, err} } items[key] = item } c.cacheStorage.Replace(items, resourceVersion) returnnil }
cache.Add
1 2 3 4 5 6 7 8 9
// Add inserts an item into the cache. func(c *cache)Add(obj interface{})error { key, err := c.keyFunc(obj) if err != nil { return KeyError{obj, err} } c.cacheStorage.Add(key, obj) returnnil }
cache.Update
1 2 3 4 5 6 7 8 9
// Update sets an item in the cache to its updated state. func(c *cache)Update(obj interface{})error { key, err := c.keyFunc(obj) if err != nil { return KeyError{obj, err} } c.cacheStorage.Update(key, obj) returnnil }
cache.Delete
1 2 3 4 5 6 7 8 9
// Delete removes an item from the cache. func(c *cache)Delete(obj interface{})error { key, err := c.keyFunc(obj) if err != nil { return KeyError{obj, err} } c.cacheStorage.Delete(key) returnnil }
// ThreadSafeStore is an interface that allows concurrent access to a storage backend. // TL;DR caveats: you must not modify anything returned by Get or List as it will break // the indexing feature in addition to not being thread safe. // // The guarantees of thread safety provided by List/Get are only valid if the caller // treats returned items as read-only. For example, a pointer inserted in the store // through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get` // on the same key and modify the pointer in a non-thread-safe way. Also note that // modifying objects stored by the indexers (if any) will *not* automatically lead // to a re-index. So it's not a good idea to directly modify the objects returned by // Get/List, in general. type ThreadSafeStore interface { Add(key string, obj interface{}) Update(key string, obj interface{}) Delete(key string) Get(key string) (item interface{}, exists bool) List() []interface{} ListKeys() []string Replace(map[string]interface{}, string) Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexKey string) ([]string, error) ListIndexFuncValues(name string) []string ByIndex(indexName, indexKey string) ([]interface{}, error) GetIndexers() Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data // in the store, the results are undefined. AddIndexers(newIndexers Indexers) error Resync() error }
// processLoop drains the work queue. // TODO: Consider doing the processing in parallel. This will require a little thought // to make sure that we don't end up processing the same object multiple times // concurrently. // // TODO: Plumb through the stopCh here (and down to the queue) so that this can // actually exit when the controller is stopped. Or just give up on this stuff // ever being stoppable. Converting this whole package to use Context would // also be helpful. func(c *controller)processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == FIFOClosedError { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } }
// Pop blocks until an item is added to the queue, and then returns it. If // multiple items are ready, they are returned in the order in which they were // added/updated. The item is removed from the queue (and the store) before it // is returned, so if you don't successfully process it, you need to add it back // with AddIfNotPresent(). // process function is called under lock, so it is safe update data structures // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc // may return an instance of ErrRequeue with a nested error to indicate the current // item should be requeued (equivalent to calling AddIfNotPresent under the lock). // // Pop returns a 'Deltas', which has a complete list of all the things // that happened to the object (deltas) while it was sitting in the queue. func(f *DeltaFIFO)Pop(process PopProcessFunc)(interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { forlen(f.queue) == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). if f.IsClosed() { returnnil, FIFOClosedError }
f.cond.Wait() } id := f.queue[0] f.queue = f.queue[1:] item, ok := f.items[id] if f.initialPopulationCount > 0 { f.initialPopulationCount-- } if !ok { // Item may have been deleted subsequently. continue } delete(f.items, id) err := process(item) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } // Don't need to copyDeltas here, because we're transferring // ownership to the caller. return item, err } }
核心代码:
1 2 3 4 5 6 7 8 9 10 11 12 13
for { ... item, ok := f.items[id] ... err := process(item) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } // Don't need to copyDeltas here, because we're transferring // ownership to the caller. return item, err }
func(p *sharedProcessor)run(stopCh <-chanstruct{}) { func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { p.wg.Start(listener.run) p.wg.Start(listener.pop) } }() <-stopCh p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop } p.wg.Wait() // Wait for all .pop() and .run() to stop }
var nextCh chan<- interface{} var notification interface{} for { select { case nextCh <- notification: // Notification dispatched var ok bool notification, ok = p.pendingNotifications.ReadOne() if !ok { // Nothing to pop nextCh = nil// Disable this select case } case notificationToAdd, ok := <-p.addCh: if !ok { return } if notification == nil { // No notification to pop (and pendingNotifications is empty) // Optimize the case - skip adding to pendingNotifications notification = notificationToAdd nextCh = p.nextCh } else { // There is already a notification waiting to be dispatched p.pendingNotifications.WriteOne(notificationToAdd) } } } }
for next := range p.nextCh { switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next)) } } }
// ResourceEventHandler can handle notifications for events that happen to a // resource. The events are informational only, so you can't return an // error. // * OnAdd is called when an object is added. // * OnUpdate is called when an object is modified. Note that oldObj is the // last known state of the object-- it is possible that several changes // were combined together, so you can't use this to see every single // change. OnUpdate is also called when a re-list happens, and it will // get called even if nothing changed. This is useful for periodically // evaluating or syncing something. // * OnDelete will get the final state of the item if it is known, otherwise // it will get an object of type DeletedFinalStateUnknown. This can // happen if the watch is closed and misses the delete event and we don't // notice the deletion until the subsequent re-list. type ResourceEventHandler interface { OnAdd(obj interface{}) OnUpdate(oldObj, newObj interface{}) OnDelete(obj interface{}) }
// NewDeploymentController creates a new DeploymentController. funcNewDeploymentController(dInformer extensionsinformers.DeploymentInformer, rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface)(*DeploymentController, error) { ... dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addDeployment, UpdateFunc: dc.updateDeployment, // This will enter the sync loop and no-op, because the deployment has been deleted from the store. DeleteFunc: dc.deleteDeployment, }) rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addReplicaSet, UpdateFunc: dc.updateReplicaSet, DeleteFunc: dc.deleteReplicaSet, }) podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: dc.deletePod, }) ... }