Kubernetes Controller Manager原理浅析

controller manager是什么?

Controller Manager 是集群内部的管理控制中心,负责统一管理与运行不同的 Controller ,实现对集群内的 Node、Pod 等所有资源的管理。

比如当通过 Deployment 创建的某个 Pod 发生异常退出时,RS Controller 便会接受并处理该退出事件,并创建新的 Pod 来维持预期副本数。

controller manager起什么作用?

k8s内部几乎每种特定资源都有特定的 Controller 维护管理,而 Controller Manager 的职责便是把所有的 Controller 聚合起来:

  1. 提供基础设施降低 Controller 的实现复杂度
  2. 启动和维持 Controller 的正常运行,watch api-server,然后对不同的 Controller 分发事件通知。

k8s中有几十种 Controller,这里列举一些相对重要的Controller:

  1. 部署控制器(Deployment Controller):负责pod的滚动更新、回滚以及支持副本的水平扩容等。
  2. 节点控制器(Node Controller): 负责在节点出现故障时进行通知和响应。
  3. 副本控制器(Replication Controller): 负责为系统中的每个副本控制器对象维护正确数量的 Pod。
  4. 端点控制器(Endpoints Controller): 填充端点(Endpoints)对象(即加入 Service 与 Pod)。
  5. 服务帐户和令牌控制器(Service Account & Token Controllers): 为新的命名空间创建默认帐户和 API 访问令牌
  6. ……

controller manager的工作流程是什么?

大体流程:

从比较高维度的视角看,Controller Manager 主要提供了一个分发事件的能力,而不同的 Controller 只需要注册对应的 Handler 来等待接收和处理事件。

  1. List & Watch:
    1. Controller manager与api-server的通信主要通过两种方式:List 和 Watch。
    2. List是短连接实现,用于获取该资源的所有object;

    3. Watch是长连接实现,用于监听在List中获取的资源的变换。

    4. api-server检测到资源产生变更时,会主动通知到Controller manager(利用分块传输编码)。
    5. 也可以说,List获取的是全量数据,Watch获取的是增量数据。

  2. client-go:
    1. client-go实现统一管理每种 Controller 的List和Watch。
    2. 将收到的event事件放到缓存中,异步分发给每个 Controller 的注册的eventHandler。
  3. Controller自己的eventHandler如何注册?
    1. 以 Deployment Controller 举例来看源码。
    2. pkg/controller/deployment/deployment_controller.goNewDeploymentController 方法中,便包括了 Event Handler 的注册,对于 Deployment Controller 来说,只需要根据不同的事件实现不同的处理逻辑,便可以实现对相应资源的管理。
    3. Deployment Controller注册自己的eventHandler:

      dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
         AddFunc:    dc.addDeployment,
         UpdateFunc: dc.updateDeployment,
         DeleteFunc: dc.deleteDeployment,
      })

    4. staging/src/k8s.io/client-go/tools/cache/shared_informer.go:477 中AddEventHandler被封装成Listener并添加到数组中,并且调用了listener的run方法。

      func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
         // 省略了一些逻辑
         listener := newProcessListener(handler)
      
         if !s.started {
            s.processor.addListener(listener)
            return
         }
      }
      
      func (p *sharedProcessor) addListener(listener *processorListener) {
         p.listenersLock.Lock()
         defer p.listenersLock.Unlock()
      
         p.listeners = append(p.listeners, listener)
         p.syncingListeners = append(p.syncingListeners, listener)
         if p.listenersStarted {
            p.wg.Start(listener.run)
            p.wg.Start(listener.pop)
         }
      }

    5. listener的run方法,就是一个worker,消费nextCh:

      func (p *processorListener) run() {
         stopCh := make(chan struct{})
         wait.Until(func() {
            for next := range p.nextCh {
               switch notification := next.(type) {
               case updateNotification:
                  p.handler.OnUpdate(notification.oldObj, notification.newObj)
               case addNotification:
                  p.handler.OnAdd(notification.newObj)
               case deleteNotification:
                  p.handler.OnDelete(notification.oldObj)
               default:
                  utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
               }
            }
            // the only way to get here is if the p.nextCh is empty and closed
            close(stopCh)
         }, 1*time.Second, stopCh)
      }

  4. Controller的eventHandler注册上了,client-go如何监听api-server并分发event事件的呢?
    1. kubernetes 在 github 上提供了一张 client-go 的架构图,从中可以看出,Controller 正是下半部分(CustomController)描述的内容,而 Controller Manager 主要完成的是上半部分。

    2. 每个Controller会启动一个Reflector,Reflector分发的源码如下:(staging/src/k8s.io/client-go/tools/cache/shared_informer.go:368

      func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
        // 这里省略很多代码……
       list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
        // ……
        w, err := r.listerWatcher.Watch(options)
         
        // ……    
        if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {……}
      }
      
      func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
         
               switch event.Type {
               case watch.Added:
                  err := r.store.Add(event.Object)
                  if err != nil {
                     utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                  }
               case watch.Modified:
                  err := r.store.Update(event.Object)
                  if err != nil {
                     utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
                  }
               case watch.Deleted:
                  // TODO: Will any consumers need access to the "last known
                  // state", which is passed in event.Object? If so, may need
                  // to change this.
                  err := r.store.Delete(event.Object)
                  if err != nil {
                     utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
                  }
               case watch.Bookmark:
                  // A `Bookmark` means watch has synced here, just update the resourceVersion
               default:
                  utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
               }
               *resourceVersion = newResourceVersion
               r.setLastSyncResourceVersion(newResourceVersion)
               eventCount++
            }
         return nil
      }
      
      
      

    3. 最终调用r.store.Add(),实际上就是写入到了Delta Fifo queue中,然后有专门的goroutine来处理Delta Fifo queue中的数据,我们看下源码:

      func (c *controller) processLoop() {
         for {
      	// Pop阻塞式的从Delta Fifo queue弹出数据,然后交给Process方法处理。Process变量先前被HandleDeltas赋值,可以直接看HandleDeltas的实现。
            obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) 
            if err != nil {
               if err == ErrFIFOClosed {
                  return
               }
               if c.config.RetryOnError {
                  // This is the safe way to re-enqueue.
                  c.config.Queue.AddIfNotPresent(obj)
               }
            }
         }
      }

      func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
         s.blockDeltas.Lock()
         defer s.blockDeltas.Unlock()
      
         // from oldest to newest
         for _, d := range obj.(Deltas) {
            switch d.Type {
            case Sync, Replaced, Added, Updated:
               s.cacheMutationDetector.AddObject(d.Object)
               if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                  if err := s.indexer.Update(d.Object); err != nil {
                     return err
                  }
      
                  isSync := false
                  switch {
                  case d.Type == Sync:
                     // Sync events are only propagated to listeners that requested resync
                     isSync = true
                  case d.Type == Replaced:
                     if accessor, err := meta.Accessor(d.Object); err == nil {
                        if oldAccessor, err := meta.Accessor(old); err == nil {
                           // Replaced events that didn't change resourceVersion are treated as resync events
                           // and only propagated to listeners that requested resync
                           isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
                        }
                     }
                  }
      			// 最终调用distribute处理shu
                  s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
               } else {
                  if err := s.indexer.Add(d.Object); err != nil {
                     return err
                  }
                  s.processor.distribute(addNotification{newObj: d.Object}, false)
               }
            case Deleted:
               if err := s.indexer.Delete(d.Object); err != nil {
                  return err
               }
               s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
            }
         }
         return nil
      }
      
      // 分发数据,
      func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
         p.listenersLock.RLock()
         defer p.listenersLock.RUnlock()
      
         if sync {
            for _, listener := range p.syncingListeners {
               listener.add(obj)
            }
         } else {
            for _, listener := range p.listeners {
      		// 继续调用add方法
               listener.add(obj)
            }
         }
      }
      
      
      func (p *processorListener) add(notification interface{}) {
         p.addCh <- notification
      }

    4. 可以看到,实际上事件通知是被写到了p.addCh中,我们之前的listener是消费的p.nextCh,这里实际上还有一个缓冲区,有专门的goroutine消费p.addCh,并向nextCh中填充事件通知。

      func (p *processorListener) pop() {
         defer utilruntime.HandleCrash()
         defer close(p.nextCh) // Tell .run() to stop
      
         var nextCh chan<- interface{}
         var notification interface{}
         for {
            select {
            case nextCh <- notification:
               // Notification dispatched
               var ok bool
               notification, ok = p.pendingNotifications.ReadOne()
               if !ok { // Nothing to pop
                  nextCh = nil // Disable this select case
               }
            case notificationToAdd, ok := <-p.addCh:
               if !ok {
                  return
               }
               if notification == nil { // No notification to pop (and pendingNotifications is empty)
                  // Optimize the case - skip adding to pendingNotifications
                  notification = notificationToAdd
                  nextCh = p.nextCh
               } else { // There is already a notification waiting to be dispatched
                  p.pendingNotifications.WriteOne(notificationToAdd)
               }
            }
         }
      }

参考资料:

https://blog.ihypo.net/15763910382218.html

https://zhuanlan.zhihu.com/p/59660536