Kubernetes Informer 机制详解

核心概念

Informer 是 Kubernetes 中用于监听和缓存资源对象的核心机制,它通过 ListAndWatch 机制实现高效的资源监控。

核心组件及作用

1. Reflector(反射器)

  • 作用:负责从 Kubernetes API Server 获取资源对象
  • 功能
    • List:获取资源的全量数据
    • Watch:监听资源的增量变化
    • 将数据放入 Delta FIFO 队列

2. Delta FIFO Queue(增量队列)

  • 作用:存储资源对象的变化(增删改)
  • 特点
    • 保持操作顺序
    • 存储对象的增量变化(Delta)
    • 线程安全

3. Informer(通知器)

  • 作用:从 Delta FIFO 队列中取出对象并处理
  • 功能
    • 调用 Indexer 更新本地缓存
    • 触发注册的事件处理器

4. Indexer(索引器)

  • 作用:本地缓存,提供快速查询
  • 功能
    • 存储资源对象的本地副本
    • 提供基于索引的快速查找
    • 线程安全的读写操作

5. Resource Event Handlers(资源事件处理器)

  • 作用:处理资源对象的变化事件
  • 类型
    • AddFunc:对象添加时调用
    • UpdateFunc:对象更新时调用
    • DeleteFunc:对象删除时调用

工作流程示例

让我用一个监控 Pod 的例子来串起来整个流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 1. 创建 Informer Factory
factory := informers.NewSharedInformerFactory(clientset, time.Minute*10)

// 2. 获取 Pod Informer
podInformer := factory.Core().V1().Pods()

// 3. 注册事件处理器
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
fmt.Printf("Pod Added: %s/%s\n", pod.Namespace, pod.Name)
// 处理 Pod 创建事件
},
UpdateFunc: func(old, new interface{}) {
oldPod := old.(*v1.Pod)
newPod := new.(*v1.Pod)
fmt.Printf("Pod Updated: %s/%s\n", newPod.Namespace, newPod.Name)
// 处理 Pod 更新事件
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
fmt.Printf("Pod Deleted: %s/%s\n", pod.Namespace, pod.Name)
// 处理 Pod 删除事件
},
})

// 4. 启动 Informer
factory.Start(stopCh)
factory.WaitForCacheSync(stopCh)

// 5. 通过 Lister 查询对象
pods, err := podInformer.Lister().Pods("default").List(labels.Everything())

详细工作流程

第一阶段:初始化和全量同步

1
2
3
4
1. Reflector 发送 LIST 请求到 API Server
2. API Server 返回所有 Pod 资源
3. Reflector 将所有 Pod 对象包装成 "Added" Delta
4. 放入 Delta FIFO 队列

第二阶段:增量监听

1
2
3
4
1. Reflector 建立 Watch 连接
2. 当有 Pod 创建/更新/删除时,API Server 推送事件
3. Reflector 收到事件后包装成对应 Delta
4. 放入 Delta FIFO 阶段

第三阶段:处理和缓存

1
2
3
4
1. Informer 从 Delta FIFO 取出 Delta
2. 调用 Indexer 更新本地缓存
3. 触发注册的事件处理器
4. 应用程序可以通过 Lister 查询缓存

实际应用场景示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// 控制器示例:监控 Deployment 变化
type DeploymentController struct {
clientset kubernetes.Interface
informer cache.SharedIndexInformer
queue workqueue.RateLimitingInterface
}

func NewDeploymentController(clientset kubernetes.Interface) *DeploymentController {
// 1. 创建 ListWatcher
listWatcher := cache.NewListWatchFromClient(
clientset.AppsV1().RESTClient(),
"deployments",
v1.NamespaceAll,
fields.Everything(),
)

// 2. 创建 Informer
informer := cache.NewSharedIndexInformer(
listWatcher,
&appsv1.Deployment{},
time.Minute*10,
cache.Indexers{},
)

controller := &DeploymentController{
clientset: clientset,
informer: informer,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}

// 3. 注册事件处理器
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
controller.queue.Add(key)
}
},
UpdateFunc: func(old, new interface{}) {
// 检查是否有实质性变化
oldDeploy := old.(*appsv1.Deployment)
newDeploy := new.(*appsv1.Deployment)
if oldDeploy.ResourceVersion != newDeploy.ResourceVersion {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
controller.queue.Add(key)
}
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
controller.queue.Add(key)
}
},
})

return controller
}

// 4. 处理工作队列中的项目
func (c *DeploymentController) processNextItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)

// 处理 Deployment 逻辑
err := c.syncDeployment(key.(string))
if err != nil {
c.queue.AddRateLimited(key)
} else {
c.queue.Forget(key)
}

return true
}

优势和特点

优势:

  1. 高效:减少 API Server 压力
  2. 实时:通过 Watch 机制实现实时同步
  3. 可靠:自动重连和错误处理
  4. 缓存:本地缓存提高查询效率

特点:

  1. 最终一致性:保证本地缓存与集群状态最终一致
  2. 增量更新:只处理变化的数据
  3. 事件驱动:基于事件的异步处理机制

总结

Informer 机制通过 Reflector → Delta FIFO → Informer → Indexer 的流水线处理,实现了高效的 Kubernetes 资源监控和缓存,是构建 Kubernetes 控制器和 Operator 的核心基础。