您好,登錄后才能下訂單哦!
如何進行NVIDIA及k8s-device-plugin源碼分析,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
在Kubernetes如何通過Device Plugins來使用NVIDIA GPU中,對NVIDIA/k8s-device-plugin的工作原理進行了深入分析,為了方便我們在這再次貼出其內部實現原理圖:
PreStartContainer和GetDevicePluginOptions兩個接口,在NVIDIA/k8s-device-plugin中可以忽略,可以認為是空實現。我們主要關注ListAndWatch和Allocate的實現。
一切從main函數開始!核心的代碼如下:
func main() { log.Println("Loading NVML") if err := nvml.Init(); err != nil { select {} } ... log.Println("Fetching devices.") if len(getDevices()) == 0 { select {} } log.Println("Starting FS watcher.") watcher, err := newFSWatcher(pluginapi.DevicePluginPath) if err != nil { os.Exit(1) } ... log.Println("Starting OS watcher.") sigs := newOSWatcher(syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) restart := true var devicePlugin *NvidiaDevicePlugin L: for { if restart { if devicePlugin != nil { devicePlugin.Stop() } devicePlugin = NewNvidiaDevicePlugin() if err := devicePlugin.Serve(); err != nil { ... } else { restart = false } } select { case event := <-watcher.Events: if event.Name == pluginapi.KubeletSocket && event.Op&fsnotify.Create == fsnotify.Create { restart = true } case err := <-watcher.Errors: case s := <-sigs: switch s { case syscall.SIGHUP: restart = true default: devicePlugin.Stop() break L } } } }
相關說明不需多說,請參考下面的流程邏輯圖:
k8s-device-plugin啟動流程中,devicePlugin.Serve
負責啟動gRPC Server Start對外提供服務,然后把自己注冊到kubelet。
// Serve starts the gRPC server and register the device plugin to Kubelet func (m *NvidiaDevicePlugin) Serve() error { err := m.Start() if err != nil { log.Printf("Could not start device plugin: %s", err) return err } log.Println("Starting to serve on", m.socket) err = m.Register(pluginapi.KubeletSocket, resourceName) if err != nil { log.Printf("Could not register device plugin: %s", err) m.Stop() return err } log.Println("Registered device plugin with Kubelet") return nil }
Start的代碼如下:
// Start starts the gRPC server of the device plugin func (m *NvidiaDevicePlugin) Start() error { err := m.cleanup() if err != nil { return err } sock, err := net.Listen("unix", m.socket) if err != nil { return err } m.server = grpc.NewServer([]grpc.ServerOption{}...) pluginapi.RegisterDevicePluginServer(m.server, m) go m.server.Serve(sock) // Wait for server to start by launching a blocking connexion conn, err := dial(m.socket, 5*time.Second) if err != nil { return err } conn.Close() go m.healthcheck() return nil }
更加深入的代碼調用關系,這里不多介紹,直接貼出Start的實現邏輯圖:
Start流程中負責創建nvidia.sock文件。
需要特別說明healthcheck
部分:
healthcheck啟動協程對管理的devices進行健康狀態監控,一旦發現有device unhealthy,則發送到NvidiaDevicePlugin的health channel。device plugin的ListAndWatch會從health channel中獲取這些unhealthy devices,并通知到kubelet進行更新。
只監控nvmlEventTypeXidCriticalError
事件,一旦監控到某個device的這個Event,就認為該device unhealthy。關于nvmlEventTypeXidCriticalError
的說明,請參考NVIDIA的nvml api文檔。
可以通過設置NVIDIA device plugin Pod內的環境變量DP_DISABLE_HEALTHCHECKS
為”all”來取消healthcheck。不設置或者設置為其他值都會啟動healthcheck,默認部署時不設置。
Start之后,接著進入Register流程,其代碼如下:
// Register registers the device plugin for the given resourceName with Kubelet. func (m *NvidiaDevicePlugin) Register(kubeletEndpoint, resourceName string) error { conn, err := dial(kubeletEndpoint, 5*time.Second) if err != nil { return err } defer conn.Close() client := pluginapi.NewRegistrationClient(conn) reqt := &pluginapi.RegisterRequest{ Version: pluginapi.Version, Endpoint: path.Base(m.socket), ResourceName: resourceName, } _, err = client.Register(context.Background(), reqt) if err != nil { return err } return nil }
Register的實現流程圖如下:
注冊的Resource Name是nvidia.com/gpu
注冊的Version是v1beta1
Stop的代碼如下:
// Stop stops the gRPC server func (m *NvidiaDevicePlugin) Stop() error { if m.server == nil { return nil } m.server.Stop() m.server = nil close(m.stop) return m.cleanup() }
Stop的實現流程圖如下:
Stop流程中負責停止gRPC Server,并刪除nvidia.sock。
ListAndWatch接口主要負責監控health channel,發現有gpu變成unhealthy后,將完成的gpu list信息(ID和health狀態)發送給kubelet進行更新。
// ListAndWatch lists devices and update that list according to the health status func (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error { s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs}) for { select { case <-m.stop: return nil case d := <-m.health: // FIXME: there is no way to recover from the Unhealthy state. d.Health = pluginapi.Unhealthy s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs}) } } }
ListAndWatch的實現流程圖如下:
Allocate負責接口kubelet為Container請求分配gpu的請求,請求的結構體如下:
// - Allocate is expected to be called during pod creation since allocation // failures for any container would result in pod startup failure. // - Allocate allows kubelet to exposes additional artifacts in a pod's // environment as directed by the plugin. // - Allocate allows Device Plugin to run device specific operations on // the Devices requested type AllocateRequest struct { ContainerRequests []*ContainerAllocateRequest `protobuf:"bytes,1,rep,name=container_requests,json=containerRequests" json:"container_requests,omitempty"` } type ContainerAllocateRequest struct { DevicesIDs []string `protobuf:"bytes,1,rep,name=devicesIDs" json:"devicesIDs,omitempty"` }
device plugin Allocate的Response結構體定義如下:
// AllocateResponse includes the artifacts that needs to be injected into // a container for accessing 'deviceIDs' that were mentioned as part of // 'AllocateRequest'. // Failure Handling: // if Kubelet sends an allocation request for dev1 and dev2. // Allocation on dev1 succeeds but allocation on dev2 fails. // The Device plugin should send a ListAndWatch update and fail the // Allocation request type AllocateResponse struct { ContainerResponses []*ContainerAllocateResponse `protobuf:"bytes,1,rep,name=container_responses,json=containerResponses" json:"container_responses,omitempty"` } type ContainerAllocateResponse struct { // List of environment variable to be set in the container to access one of more devices. Envs map[string]string `protobuf:"bytes,1,rep,name=envs" json:"envs,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` // Mounts for the container. Mounts []*Mount `protobuf:"bytes,2,rep,name=mounts" json:"mounts,omitempty"` // Devices for the container. Devices []*DeviceSpec `protobuf:"bytes,3,rep,name=devices" json:"devices,omitempty"` // Container annotations to pass to the container runtime Annotations map[string]string `protobuf:"bytes,4,rep,name=annotations" json:"annotations,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` }
Allocate的代碼實現如下:
// Allocate which return list of devices. func (m *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { devs := m.devs responses := pluginapi.AllocateResponse{} for _, req := range reqs.ContainerRequests { response := pluginapi.ContainerAllocateResponse{ Envs: map[string]string{ "NVIDIA_VISIBLE_DEVICES": strings.Join(req.DevicesIDs, ","), }, } for _, id := range req.DevicesIDs { if !deviceExists(devs, id) { return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id) } } responses.ContainerResponses = append(responses.ContainerResponses, &response) } return &responses, nil }
下面是其實現邏輯圖:
Allocate中會遍歷ContainerRequests,將DeviceIDs封裝到ContainerAllocateResponse的Envs:NVIDIA_VISIBLE_DEVICES
中,格式為:”${ID_1},${ID_2},...
”
除此之外,并沒有封裝Mounts, Devices, Annotations。
關于如何進行NVIDIA及k8s-device-plugin源碼分析問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。