Kubernetes Informer 机制详解
核心概念
Informer 是 Kubernetes 中用于监听和缓存资源对象的核心机制,它通过 ListAndWatch 机制实现高效的资源监控。
核心组件及作用
1. Reflector(反射器)
- 作用:负责从 Kubernetes API Server 获取资源对象
- 功能:
- List:获取资源的全量数据
- Watch:监听资源的增量变化
- 将数据放入 Delta FIFO 队列
2. Delta FIFO Queue(增量队列)
- 作用:存储资源对象的变化(增删改)
- 特点:
- 保持操作顺序
- 存储对象的增量变化(Delta)
- 线程安全
- 作用:从 Delta FIFO 队列中取出对象并处理
- 功能:
- 调用 Indexer 更新本地缓存
- 触发注册的事件处理器
4. Indexer(索引器)
- 作用:本地缓存,提供快速查询
- 功能:
- 存储资源对象的本地副本
- 提供基于索引的快速查找
- 线程安全的读写操作
5. Resource Event Handlers(资源事件处理器)
- 作用:处理资源对象的变化事件
- 类型:
AddFunc
:对象添加时调用
UpdateFunc
:对象更新时调用
DeleteFunc
:对象删除时调用
工作流程示例
让我用一个监控 Pod 的例子来串起来整个流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| factory := informers.NewSharedInformerFactory(clientset, time.Minute*10)
podInformer := factory.Core().V1().Pods()
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { pod := obj.(*v1.Pod) fmt.Printf("Pod Added: %s/%s\n", pod.Namespace, pod.Name) }, UpdateFunc: func(old, new interface{}) { oldPod := old.(*v1.Pod) newPod := new.(*v1.Pod) fmt.Printf("Pod Updated: %s/%s\n", newPod.Namespace, newPod.Name) }, DeleteFunc: func(obj interface{}) { pod := obj.(*v1.Pod) fmt.Printf("Pod Deleted: %s/%s\n", pod.Namespace, pod.Name) }, })
factory.Start(stopCh) factory.WaitForCacheSync(stopCh)
pods, err := podInformer.Lister().Pods("default").List(labels.Everything())
|
详细工作流程
第一阶段:初始化和全量同步
1 2 3 4
| 1. Reflector 发送 LIST 请求到 API Server 2. API Server 返回所有 Pod 资源 3. Reflector 将所有 Pod 对象包装成 "Added" Delta 4. 放入 Delta FIFO 队列
|
第二阶段:增量监听
1 2 3 4
| 1. Reflector 建立 Watch 连接 2. 当有 Pod 创建/更新/删除时,API Server 推送事件 3. Reflector 收到事件后包装成对应 Delta 4. 放入 Delta FIFO 阶段
|
第三阶段:处理和缓存
1 2 3 4
| 1. Informer 从 Delta FIFO 取出 Delta 2. 调用 Indexer 更新本地缓存 3. 触发注册的事件处理器 4. 应用程序可以通过 Lister 查询缓存
|
实际应用场景示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| type DeploymentController struct { clientset kubernetes.Interface informer cache.SharedIndexInformer queue workqueue.RateLimitingInterface }
func NewDeploymentController(clientset kubernetes.Interface) *DeploymentController { listWatcher := cache.NewListWatchFromClient( clientset.AppsV1().RESTClient(), "deployments", v1.NamespaceAll, fields.Everything(), ) informer := cache.NewSharedIndexInformer( listWatcher, &appsv1.Deployment{}, time.Minute*10, cache.Indexers{}, ) controller := &DeploymentController{ clientset: clientset, informer: informer, queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), } informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { controller.queue.Add(key) } }, UpdateFunc: func(old, new interface{}) { oldDeploy := old.(*appsv1.Deployment) newDeploy := new.(*appsv1.Deployment) if oldDeploy.ResourceVersion != newDeploy.ResourceVersion { key, err := cache.MetaNamespaceKeyFunc(new) if err == nil { controller.queue.Add(key) } } }, DeleteFunc: func(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err == nil { controller.queue.Add(key) } }, }) return controller }
func (c *DeploymentController) processNextItem() bool { key, quit := c.queue.Get() if quit { return false } defer c.queue.Done(key) err := c.syncDeployment(key.(string)) if err != nil { c.queue.AddRateLimited(key) } else { c.queue.Forget(key) } return true }
|
优势和特点
优势:
- 高效:减少 API Server 压力
- 实时:通过 Watch 机制实现实时同步
- 可靠:自动重连和错误处理
- 缓存:本地缓存提高查询效率
特点:
- 最终一致性:保证本地缓存与集群状态最终一致
- 增量更新:只处理变化的数据
- 事件驱动:基于事件的异步处理机制
总结
Informer 机制通过 Reflector → Delta FIFO → Informer → Indexer 的流水线处理,实现了高效的 Kubernetes 资源监控和缓存,是构建 Kubernetes 控制器和 Operator 的核心基础。