NVIDIA device plugin for Kubernetes原理分析
什么是 Device Plugin
K8s 原生并没有支持第三方设备厂商的物理设备资源,因此 Device Plugins 给第三方设备厂商提供了相关接口,可以让他们的物理设备资源以 Extended Resources 提供给底层的容器。
当 device plugin 功能启动后,可以令 kubelet 开放 Register 的 gRPC 服务,device plugin 就可以通过这个服务向 kubelet 进行注册,注册成功后 device plugin 就进入了 Serving 模式,提供前面提到的 gRPC 接口调用服务,kubelet 也就可以通过调用 Listandwatch
、Allocate
等方法对设备进行操作,可以用下图来描述单一节点上这一过程:

下面以 NVIDIA k8s-device-plugin 为例简单讲讲这一过程。
注册服务
先看 gRPC 注册部分,下面的函数用于启动一个 gRPC 服务器并在 kubelet 中注册
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| func (plugin *NvidiaDevicePlugin) Start() error { plugin.initialize()
if err := plugin.waitForMPSDaemon(); err != nil { return fmt.Errorf("error waiting for MPS daemon: %w", err) }
err := plugin.Serve() if err != nil { klog.Infof("Could not start device plugin for '%s': %s", plugin.rm.Resource(), err) plugin.cleanup() return err } klog.Infof("Starting to serve '%s' on %s", plugin.rm.Resource(), plugin.socket)
err = plugin.Register() if err != nil { klog.Infof("Could not register device plugin: %s", err) return errors.Join(err, plugin.Stop()) } klog.Infof("Registered device plugin for '%s' with Kubelet", plugin.rm.Resource())
go func() { err := plugin.rm.CheckHealth(plugin.stop, plugin.health) if err != nil { klog.Infof("Failed to start health check: %v; continuing with health checks disabled", err) } }()
return nil }
|
其中 Register
就是将资源注册给本机的 kubelet,注册请求包括 版本号、socket 名称、资源名称(如 nvidia.com/gpu)以及一些额外参数。注册好以后,kubelet 就可以通过 gRPC 来调用现在启用的 RPC server 中的一些服务,具体 proto 定义如下
1 2 3 4 5 6 7 8 9 10 11 12
| service DevicePlugin {
rpc GetDevicePluginOptions(Empty) returns (DevicePluginOptions) {}
rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}
rpc GetPreferredAllocation(PreferredAllocationRequest) returns (PreferredAllocationResponse) {}
rpc Allocate(AllocateRequest) returns (AllocateResponse) {}
rpc PreStartContainer(PreStartContainerRequest) returns (PreStartContainerResponse) {} }
|
下文是的一些重点 rpc。
ListAndWatch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| func (plugin *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error { if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: plugin.apiDevices()}); err != nil { return err }
for { select { case <-plugin.stop: return nil case d := <-plugin.health: d.Health = pluginapi.Unhealthy klog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.ID) if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: plugin.apiDevices()}); err != nil { return nil } } } }
|
ListAndWatch
负责发送 GPU list 信息,同时在 GPU 健康状态发生变化时,再次发送进行更新。
Allocate
allocate
是 kubelet 在创建底层容器时会调用的操作,从而使创建的容器能够使用相应的资源,其代码如下,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| func (plugin *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { responses := pluginapi.AllocateResponse{} for _, req := range reqs.ContainerRequests { if err := plugin.rm.ValidateRequest(req.DevicesIDs); err != nil { return nil, fmt.Errorf("invalid allocation request for %q: %w", plugin.rm.Resource(), err) } response, err := plugin.getAllocateResponse(req.DevicesIDs) if err != nil { return nil, fmt.Errorf("failed to get allocate response: %v", err) } responses.ContainerResponses = append(responses.ContainerResponses, response) }
return &responses, nil }
func (plugin *NvidiaDevicePlugin) getAllocateResponse(requestIds []string) (*pluginapi.ContainerAllocateResponse, error) { deviceIDs := plugin.deviceIDsFromAnnotatedDeviceIDs(requestIds)
response := &pluginapi.ContainerAllocateResponse{ Envs: make(map[string]string), } if plugin.deviceListStrategies.IsCDIEnabled() { responseID := uuid.New().String() if err := plugin.updateResponseForCDI(response, responseID, deviceIDs...); err != nil { return nil, fmt.Errorf("failed to get allocate response for CDI: %v", err) } } if plugin.deviceListStrategies.Includes(spec.DeviceListStrategyEnvvar) { plugin.updateResponseForDeviceListEnvvar(response, deviceIDs...) } if plugin.deviceListStrategies.Includes(spec.DeviceListStrategyVolumeMounts) { plugin.updateResponseForDeviceMounts(response, deviceIDs...) } if plugin.config.Sharing.SharingStrategy() == spec.SharingStrategyMPS { plugin.updateResponseForMPS(response) } if *plugin.config.Flags.Plugin.PassDeviceSpecs { response.Devices = append(response.Devices, plugin.apiDeviceSpecs(*plugin.config.Flags.NvidiaDriverRoot, requestIds)...) } if *plugin.config.Flags.GDSEnabled { response.Envs["NVIDIA_GDS"] = "enabled" } if *plugin.config.Flags.MOFEDEnabled { response.Envs["NVIDIA_MOFED"] = "enabled" } return response, nil }
|
可以看出,其核心部分就是将请求中的 DeviceID 封装到 Envs:NVIDIA_VISIBLE_DEVICES
,其实就是改环境变量 NVIDIA_VISIBLE_DEVICES
,之后结合GPU 容器底层实现即可创建对应的容器。
总体流程
具体来讲,含 GPU 需求的 pod 在包含 GPU 设备的 K8s 集群上的生命周期如下:
- 各 Node 上的 kubelet 和各设备的 device plugin 交互完成注册,并通过
ListAndWatch
方法获得可用设备的资源信息,并将资源信息同步至集群 apiserver 中;
- 含 GPU 需求的 pod 创建后,会根据在 apiserver 中的信息选取合适的节点进行调度至某个节点;
- 节点上的 kubelet 获取 pod 信息后,会先根据 kubelet 本身的缓存选取节点上符合 pod 需求的具体设备,再根据设备 id 向 device plugin 发送
Allocate
调用,获得相应的信息并进行存储,kubelet 中有一个专门存储 pod 与设备信息的 map podDevices
(在 ManagerImpl
中);
- Kubelet 在创建 pod 的容器时,就会根据这些信息创建出能够使用所需设备的容器。
Allocate
中的相关参数如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| message AllocateResponse { repeated ContainerAllocateResponse container_responses = 1; }
message ContainerAllocateResponse { map<string, string> envs = 1; repeated Mount mounts = 2; repeated DeviceSpec devices = 3; map<string, string> annotations = 4; repeated CDIDevice cdi_devices = 5 [(gogoproto.customname) = "CDIDevices"]; }
message AllocateRequest { repeated ContainerAllocateRequest container_requests = 1; }
message ContainerAllocateRequest { repeated string devices_ids = 1 [(gogoproto.customname) = "DevicesIDs"]; }
|
这些设备参数主要用于帮助容器启动时能够使用这些设备。
当 kubelet 要创建 pod 的容器时,就会从中读取这些参数来进行参数指定,最终能够使用这些设备。