亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

zookeeper(12)源碼分析-請求處理鏈(2)

發布時間:2020-06-30 11:05:29 來源:網絡 閱讀:191 作者:shayang88 欄目:編程語言

SyncRequestProcessor,該處理器將請求存入磁盤,其將請求批量的存入磁盤以提高效率,請求在寫入磁盤之前是不會被轉發到下個處理器的。

類的核心屬性

SyncRequestProcessor維護了ZooKeeperServer實例,其用于獲取ZooKeeper的數據庫和其他信息;維護了一個處理請求的隊列,其用于存放請求;維護了一個處理快照的線程,用于處理快照;維護了一個running標識,標識SyncRequestProcessor是否在運行;同時還維護了一個等待被刷新到磁盤的請求隊列。

// Zookeeper服務器
    private final ZooKeeperServer zks;
    // 請求隊列
    private final LinkedBlockingQueue<Request> queuedRequests =
        new LinkedBlockingQueue<Request>();
    // 下個處理器
    private final RequestProcessor nextProcessor;
    // 快照處理線程
    private Thread snapInProcess = null;
    // 是否在運行中
    volatile private boolean running;

    /**
     * Transactions that have been written and are waiting to be flushed to
     * disk. Basically this is the list of SyncItems whose callbacks will be
     * invoked after flush returns successfully.
     */
    // 等待被刷新到磁盤的請求隊列
    private final LinkedList<Request> toFlush = new LinkedList<Request>();
    // 隨機數生成器
    private final Random r = new Random();
    /**
     * The number of log entries to log before starting a snapshot
     */
    // 快照個數
    private static int snapCount = ZooKeeperServer.getSnapCount();
    // 結束請求標識
    private final Request requestOfDeath = Request.requestOfDeath;

構造函數

構造函數首先會調用父類的構造函數,然后根據構造函數參數給類的屬性賦值,其中會確定下個處理器,并會設置該處理器正在運行的標識。

public SyncRequestProcessor(ZooKeeperServer zks,
            RequestProcessor nextProcessor) {
        super("SyncThread:" + zks.getServerId(), zks
                .getZooKeeperServerListener());
        this.zks = zks;
        this.nextProcessor = nextProcessor;
        running = true;
    }

核心方法

1、run

@Override
    public void run() {
        try {
            // 寫日志數量初始化為0
            int logCount = 0;

            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
            // 防止集群中所有機器在同一時刻進行數據快照,對是否進行數據快照增加隨機因素
            int randRoll = r.nextInt(snapCount/2);
            while (true) {
                Request si = null;
                // 沒有需要刷新到磁盤的請求
                if (toFlush.isEmpty()) {
                    // 從請求隊列中取出一個請求,若queuedRequests隊列為空會阻塞
                    si = queuedRequests.take();
                } else {
                    // 從請求隊列中取出一個請求,若queuedRequests隊列為空,則返回空,不會阻塞
                    si = queuedRequests.poll();
                    // 取出的請求為空
                    if (si == null) {
                        // 刷新數據磁盤
                        flush(toFlush);
                        continue;
                    }
                }
                // 在關閉處理器之后,會添加requestOfDeath請求到queuedRequests隊列,表示關閉后不再處理請求
                if (si == requestOfDeath) {
                    break;
                }
                // 請求不為空,處理請求
                if (si != null) {
                    // track the number of records written to the log
                    // 將寫請求添加至事務日志文件 FileTxnSnapLog.append(si)
                    if (zks.getZKDatabase().append(si)) {
                        // 日志寫入,logCount加1
                        logCount++;
                                                //確定是否需要進行數據快照
                        if (logCount > (snapCount / 2 + randRoll)) {
                            randRoll = r.nextInt(snapCount/2);
                            // roll the log
                            // 滾動日志,從當前日志文件滾到下一個日志文件,不是回滾
                            zks.getZKDatabase().rollLog();
                            // take a snapshot
                            if (snapInProcess != null && snapInProcess.isAlive()) {  // 正在進行快照
                                LOG.warn("Too busy to snap, skipping");
                            } else {
                                // 創建線程來處理快照
                                snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                        public void run() {
                                            try {
                                                // 進行快照
                                                zks.takeSnapshot();
                                            } catch(Exception e) {
                                                LOG.warn("Unexpected exception", e);
                                            }
                                        }
                                    };
                                // 開始快照線程處理
                                snapInProcess.start();
                            }
                            // 重置為0
                            logCount = 0;
                        }
                    } else if (toFlush.isEmpty()) {// 讀請求會走到這里,查看此時toFlush是否為空,如果為空,說明近段時間讀多寫少,直接響應
                        // optimization for read heavy workloads
                        // iff this is a read, and there are no pending
                        // flushes (writes), then just pass this to the next
                        // processor
                        if (nextProcessor != null) {
                            // 下個處理器開始處理請求
                            nextProcessor.proce***equest(si);
                            // 處理器是Flushable的,刷新數據到磁盤
                            if (nextProcessor instanceof Flushable) {
                                ((Flushable)nextProcessor).flush();
                            }
                        }
                        continue;
                    }
                    // 將請求添加至被刷新至磁盤隊列
                    toFlush.add(si);
                    if (toFlush.size() > 1000) {// 隊列大小大于1000,直接刷新到磁盤
                        flush(toFlush);
                    }
                }
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
        } finally{
            running = false;
        }
        LOG.info("SyncRequestProcessor exited!");
    }

2、flush

flush將toFlush隊列中的請求刷新到磁盤中。

 private void flush(LinkedList<Request> toFlush)
        throws IOException, RequestProcessorException
    {
        if (toFlush.isEmpty())
            return;
        // 提交事務至ZK數據庫
        zks.getZKDatabase().commit();
        while (!toFlush.isEmpty()) {
            // 從隊列移除請求
            Request i = toFlush.remove();
            // 下個處理器開始處理請求
            if (nextProcessor != null) {
                nextProcessor.proce***equest(i);
            }
        }
        if (nextProcessor != null && nextProcessor instanceof Flushable) {
            ((Flushable)nextProcessor).flush();
        }
    }

3、shutdown

函數用于關閉SyncRequestProcessor處理器,其首先會在queuedRequests隊列中添加一個結束請求requestOfDeath,然后再判斷SyncRequestProcessor是否還在運行,若是,則會等待其結束;之后判斷toFlush隊列是否為空,若不為空,則刷新到磁盤中

public void shutdown() {
        LOG.info("Shutting down");
        // 添加結束請求請求至隊列
        queuedRequests.add(requestOfDeath);
        try {
            // 還在運行
            if(running){
                this.join();// 等待該線程終止
            }
            if (!toFlush.isEmpty()) {// 隊列不為空,刷新到磁盤
                flush(toFlush);
            }
        } catch(InterruptedException e) {
            LOG.warn("Interrupted while wating for " + this + " to finish");
        } catch (IOException e) {
            LOG.warn("Got IO exception during shutdown");
        } catch (RequestProcessorException e) {
            LOG.warn("Got request processor exception during shutdown");
        }
        if (nextProcessor != null) {
            nextProcessor.shutdown();
        }
    }
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

新乡市| 新密市| 泰顺县| 巨野县| 沙田区| 渭南市| 筠连县| 濮阳县| 泰来县| 翼城县| 苍溪县| 枣强县| 宁明县| 乌拉特后旗| 偏关县| 泌阳县| 广河县| 拜城县| 龙胜| 长岭县| 横山县| 迁西县| 文成县| 光泽县| 营口市| 土默特右旗| 嘉祥县| 浮梁县| 柳河县| 太白县| 平利县| 涟源市| 兴和县| 民丰县| 永新县| 怀安县| 湖南省| 馆陶县| 西华县| 江津市| 东港市|