亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

# IT明星不是夢 # 圖解kubernetes資源擴展機制

發布時間:2020-02-27 11:10:57 來源:網絡 閱讀:230 作者:sdxin 欄目:云計算

k8s目前主要支持CPU和內存兩種資源,為了支持用戶需要按需分配的其他硬件類型的資源的調度分配,k8s實現了設備插件框架(device plugin framework)來用于其他硬件類型的資源集成,比如現在機器學習要使用GPU等資源,今天來看下其內部的關鍵實現

1. 基礎概念

# IT明星不是夢 # 圖解kubernetes資源擴展機制

1.1 集成方式

1.1.1 DaemonSet與服務

當我們要集成本地硬件的資源的時候,我們可以在當前節點上通過DaemonSet來運行一個GRPC服務,通過這個服務來進行本地硬件資源的上報與分配

1.1.2 服務注冊設計

當提供硬件服務需要與kubelet進行通信的時候,則首先需要進行注冊,注冊的方式,則是通過最原始的底層的socket文件,并且通過Linux文件系統的inotify機制,來實現服務的注冊

1.2 插件服務感知

# IT明星不是夢 # 圖解kubernetes資源擴展機制

1.2.1 Watcher

Watcher主要是負責感知當前節點上注冊的服務,當發現新的要注冊的插件服務,則會產生對應的事件,注冊到當前的kubelet中

1.2.2 期望狀態與實際狀態

這里的狀態主要是指的是否需要注冊,因為kubelet與對應的插件服務是通過網絡進行通信的,當網絡出現問題、或者對應的插件服務故障,則可能會導致服務注冊失敗,但此時對應的服務的socket還依舊存在,即對應的插件服務依舊存在

此時就會有兩種狀態:期望狀態與實際狀態, 因為socket存在所以服務的期望狀態其實是需要注冊這個插件服務,但是實際上因為某些原因,這個插件服務并沒有完成注冊,后續會不斷的通過期望狀態,調整實際狀態,從而達到一致

1.2.3 協調器

協調器則就是完成上述兩種狀態之間操作的核心,其通過調用對應插件的回調函數,其實就是調用對應的grpc接口,來完成期望狀態與實際狀態的一致性

1.2.4 插件控制器

針對每種類型的插件,都會有對應的控制器,其實也就是實現對應設備注冊和反注冊并且完成底層資源的分配(Allocate)和收集(ListWatch)操作

2. 插件服務發現

# IT明星不是夢 # 圖解kubernetes資源擴展機制

2.1 核心數據結構

type Watcher struct {
    // 感知插件服務注冊的socket的路徑
    path                string
    fs                  utilfs.Filesystem
    // inotify監測插件服務socket變化
    fsWatcher           *fsnotify.Watcher
    stopped             chan struct{}
    // 存儲期望狀態
    desiredStateOfWorld cache.DesiredStateOfWorld
}

2.2 初始化

初始化其實就是創建對應的目錄

func (w *Watcher) init() error {
    klog.V(4).Infof("Ensuring Plugin directory at %s ", w.path)

    if err := w.fs.MkdirAll(w.path, 0755); err != nil {
        return fmt.Errorf("error (re-)creating root %s: %v", w.path, err)
    }

    return nil
}

2.3 插件服務發現核心

    go func(fsWatcher *fsnotify.Watcher) {
        defer close(w.stopped)
        for {
            select {
            case event := <-fsWatcher.Events:
                //如果發現對應目錄的文件的變化,則會觸發對應的事件
                if event.Op&fsnotify.Create == fsnotify.Create {
                    err := w.handleCreateEvent(event)
                    if err != nil {
                        klog.Errorf("error %v when handling create event: %s", err, event)
                    }
                } else if event.Op&fsnotify.Remove == fsnotify.Remove {
                    w.handleDeleteEvent(event)
                }
                continue
            case err := <-fsWatcher.Errors:
                if err != nil {
                    klog.Errorf("fsWatcher received error: %v", err)
                }
                continue
            case <-stopCh:
                // In case of plugin watcher being stopped by plugin manager, stop
                // probing the creation/deletion of plugin sockets.
                // Also give all pending go routines a chance to complete
                select {
                case <-w.stopped:
                case <-time.After(11 * time.Second):
                    klog.Errorf("timeout on stopping watcher")
                }
                w.fsWatcher.Close()
                return
            }
        }
    }(fsWatcher)

2.4 補償機制

其實補償機制主要是在重新啟動kubelet的時候,需要將之前已經存在的socket重新注冊到當前的kubelet中

func (w *Watcher) traversePluginDir(dir string) error {
    return w.fs.Walk(dir, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            if path == dir {
                return fmt.Errorf("error accessing path: %s error: %v", path, err)
            }

            klog.Errorf("error accessing path: %s error: %v", path, err)
            return nil
        }

        switch mode := info.Mode(); {
        case mode.IsDir():
            if err := w.fsWatcher.Add(path); err != nil {
                return fmt.Errorf("failed to watch %s, err: %v", path, err)
            }
        case mode&os.ModeSocket != 0:
            event := fsnotify.Event{
                Name: path,
                Op:   fsnotify.Create,
            }
            //TODO: Handle errors by taking corrective measures
            if err := w.handleCreateEvent(event); err != nil {
                klog.Errorf("error %v when handling create event: %s", err, event)
            }
        default:
            klog.V(5).Infof("Ignoring file %s with mode %v", path, mode)
        }

        return nil
    })
}

2.5 注冊事件回調

注冊其實就只需要感知到的socket文件路徑傳遞給期望狀態進行管理

func (w *Watcher) handlePluginRegistration(socketPath string) error {
    if runtime.GOOS == "windows" {
        socketPath = util.NormalizePath(socketPath)
    }
    // 調用期望狀態進行更新
    klog.V(2).Infof("Adding socket path or updating timestamp %s to desired state cache", socketPath)
    err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath)
    if err != nil {
        return fmt.Errorf("error adding socket path %s or updating timestamp to desired state cache: %v", socketPath, err)
    }
    return nil
}

2.6 刪除事件回調

注冊其實就只需要感知到的socket文件路徑傳遞給期望狀態進行管理

func (w *Watcher) handleDeleteEvent(event fsnotify.Event) {
    klog.V(6).Infof("Handling delete event: %v", event)

    socketPath := event.Name
    klog.V(2).Infof("Removing socket path %s from desired state cache", socketPath)
    w.desiredStateOfWorld.RemovePlugin(socketPath)
}

3.期望狀態與實際狀態

3.1 插件信息

插件信息其實只是存儲了對應socket的路徑和最近更新的時間

type PluginInfo struct {
    SocketPath string
    Timestamp  time.Time
}

3.2 期望狀態

期望狀態與實際狀態在數據結構上都是一樣的,因為本質上只是為了存儲插件的當前的狀態信息,即更新時間,這里不在贅述

type desiredStateOfWorld struct {
    socketFileToInfo map[string]PluginInfo
    sync.RWMutex
}
type actualStateOfWorld struct {

    socketFileToInfo map[string]PluginInfo
    sync.RWMutex
}

4.OperationExecutor

目前k8s中支持兩大類的插件的管理一類是DevicePlugin即我們本文說的這些都是這種概念,一類是CSIPlugin,其中針對每一類DRiver的處理其實內部都是不一樣的,那其實在操作之前就要先感知到當前的Driver是那種類型的

OperationExecutor主要就是做這件事的,其根據不同的plugin類型,生成不同的要執行的操作,即對應的Plugin類型獲取對應的handler,就生成了一個要執行的操作

4.1 生成注冊插件回調函數

# IT明星不是夢 # 圖解kubernetes資源擴展機制

4.1.1 通過socket連接對應的插件服務

    registerPluginFunc := func() error {
        client, conn, err := dial(socketPath, dialTimeoutDuration)
        if err != nil {
            return fmt.Errorf("RegisterPlugin error -- dial failed at socket %s, err: %v", socketPath, err)
        }
        defer conn.Close()

        ctx, cancel := context.WithTimeout(context.Background(), time.Second)
        defer cancel()

        infoResp, err := client.GetInfo(ctx, &registerapi.InfoRequest{})
        if err != nil {
            return fmt.Errorf("RegisterPlugin error -- failed to get plugin info using RPC GetInfo at socket %s, err: %v", socketPath, err)
        }

4.1.2 根據插件類型驗證服務


        handler, ok := pluginHandlers[infoResp.Type]
        if !ok {
            if err := og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)); err != nil {
                return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err)
            }
            return fmt.Errorf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)
        }

        if infoResp.Endpoint == "" {
            infoResp.Endpoint = socketPath
        }
        if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
            if err = og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin validation failed with err: %v", err)); err != nil {
                return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err)
            }
            return fmt.Errorf("RegisterPlugin error -- pluginHandler.ValidatePluginFunc failed")
        }

4.1.3 注冊插件到實際狀態

        err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{
            SocketPath: socketPath,
            Timestamp:  timestamp,
        })
        if err != nil {
            klog.Errorf("RegisterPlugin error -- failed to add plugin at socket %s, err: %v", socketPath, err)
        }
            // 調用插件的注冊回調函數
        if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
            return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
        }

4.1.4 通知對應的服務注冊成功


        if err := og.notifyPlugin(client, true, ""); err != nil {
            return fmt.Errorf("RegisterPlugin error -- failed to send registration status at socket %s, err: %v", socketPath, err)
        }

4.2 通過socket構建注冊client

func dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    c, err := grpc.DialContext(ctx, unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(),
        grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
            return (&net.Dialer{}).DialContext(ctx, "unix", addr)
        }),
    )

    if err != nil {
        return nil, nil, fmt.Errorf("failed to dial socket %s, err: %v", unixSocketPath, err)
    }

    return registerapi.NewRegistrationClient(c), c, nil
}

今天就先到這里,下一章會繼續介紹如何組合上述組件以及默認的回調管理機制的實現,進探究到這里謝謝大家,感謝分享點贊,反轉又不花錢

k8s源碼閱讀電子書地址: https://www.yuque.com/baxiaoshi/tyado3

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

托克逊县| 龙南县| 临洮县| 扎鲁特旗| 河北省| 和政县| 高青县| 苏尼特右旗| 嘉定区| 安宁市| 阿拉善盟| 师宗县| 射阳县| 常宁市| 苏州市| 商河县| 崇州市| 岳普湖县| 海城市| 焦作市| 浦北县| 南澳县| 越西县| 双牌县| 广汉市| 盘锦市| 赞皇县| 汶川县| 五华县| 准格尔旗| 辛集市| 资源县| 南皮县| 平南县| 鄂尔多斯市| 文山县| 芦溪县| 克拉玛依市| 博白县| 扶风县| 多伦县|