亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ中怎么對DLedger進行整合

發布時間:2021-06-18 15:49:55 來源:億速云 閱讀:452 作者:Leah 欄目:大數據

RocketMQ中怎么對DLedger進行整合,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。

1、閱讀源碼之前的思考

RocketMQ 的消息存儲文件主要包括 commitlog 文件、consumequeue 文件與 Index 文件。commitlog 文件存儲全量的消息,consumequeue、index 文件都是基于 commitlog 文件構建的。要使用 DLedger 來實現消息存儲的一致性,應該關鍵是要實現 commitlog 文件的一致性,即 DLedger 要整合的對象應該是 commitlog 文件,即只需保證 raft 協議的復制組內各個節點的 commitlog 文件一致即可。

我們知道使用文件存儲消息都會基于一定的存儲格式,rocketmq 的 commitlog 一個條目就包含魔數、消息長度,消息屬性、消息體等,而我們再來回顧一下 DLedger 日志的存儲格式: RocketMQ中怎么對DLedger進行整合

DLedger 要整合 commitlog 文件,是不是可以把 rocketmq 消息,即一個個 commitlog 條目整體當成 DLedger 的 body 字段即可。

還等什么,跟我一起來看源碼吧!!!別急,再拋一個問題,DLedger 整合 RocketMQ commitlog,能不能做到平滑升級?

帶著這些思考和問題,一起來探究 DLedger 是如何整合 RocketMQ 的。

2、從 Broker 啟動流程看 DLedger

> 溫馨提示:本文不會詳細介紹 Broker 端的啟動流程,只會點出在啟動過程中與 DLedger 相關的代碼,如想詳細了解 Broker 的啟動流程,建議關注筆者的《RocketMQ技術內幕》一書。

Broker 涉及到 DLedger 相關關鍵點如下: RocketMQ中怎么對DLedger進行整合

2.1 構建 DefaultMessageStore

DefaultMessageStore 構造方法

if(messageStoreConfig.isEnableDLegerCommitLog()) {  // [@1](https://my.oschina.net/u/1198)
    this.commitLog = new DLedgerCommitLog(this);
 else {
    this.commitLog = new CommitLog(this);                    // @2
}

代碼@1:如果開啟 DLedger ,commitlog 的實現類為 DLedgerCommitLog,也是本文需要關注的關鍵所在。

代碼@2:如果未開啟 DLedger,則使用舊版的 Commitlog實現類。

2.2 增加節點狀態變更事件監聽器

BrokerController#initialize

if (messageStoreConfig.isEnableDLegerCommitLog()) {
    DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
    ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}

主要調用 LedgerLeaderElector 的 addRoleChanneHandler 方法增加 節點角色變更事件監聽器,DLedgerRoleChangeHandler 是實現主從切換的另外一個關鍵點。

2.3 調用 DefaultMessageStore 的 load 方法

DefaultMessageStore#load

// load Commit Log
result = result && this.commitLog.load();   // [@1](https://my.oschina.net/u/1198)
// load Consume Queue
result = result && this.loadConsumeQueue();  
if (result) {
    this.storeCheckpoint =  new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
    this.indexService.load(lastExitOK);
    this.recover(lastExitOK);                         // @2
    log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}

代碼@1、@2 最終都是委托 commitlog 對象來執行,這里的關鍵又是如果開啟了 DLedger,則最終調用的是 DLedgerCommitLog。

經過上面的鋪墊,主角 DLedgerCommitLog “閃亮登場“了。

3、DLedgerCommitLog 詳解

> 溫馨提示:由于 Commitlog 的絕大部分方法都已經在《RocketMQ技術內幕》一書中詳細介紹了,并且 DLedgerCommitLog 的實現原理與 Commitlog 文件的實現原理類同,本文會一筆帶過關于存儲部分的實現細節。

3.1 核心類圖

RocketMQ中怎么對DLedger進行整合

DLedgerCommitlog 繼承自 Commitlog。讓我們一一來看一下它的核心屬性。

  • DLedgerServer dLedgerServer 基于 raft 協議實現的集群內的一個節點,用 DLedgerServer 實例表示。

  • DLedgerConfig dLedgerConfig DLedger 的配置信息。

  • DLedgerMmapFileStore dLedgerFileStore DLedger 基于文件映射的存儲實現。

  • MmapFileList dLedgerFileList DLedger 所管理的存儲文件集合,對比 RocketMQ 中的 MappedFileQueue。

  • int id 節點ID,0 表示主節點,非0表示從節點

  • MessageSerializer messageSerializer 消息序列器。

  • long beginTimeInDledgerLock = 0 用于記錄 消息追加的時耗(日志追加所持有鎖時間)。

  • long dividedCommitlogOffset = -1 記錄的舊 commitlog 文件中的最大偏移量,如果訪問的偏移量大于它,則訪問 dledger 管理的文件。

  • boolean isInrecoveringOldCommitlog = false 是否正在恢復舊的 commitlog 文件。

接下來我們將詳細介紹 DLedgerCommitlog 各個核心方法及其實現要點。

3.2 構造方法

public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) {
    super(defaultMessageStore);                   // @1
    dLedgerConfig =  new DLedgerConfig();
    dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable());
    dLedgerConfig.setStoreType(DLedgerConfig.FILE);
    dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId());
    dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup());
    dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers());
    dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
    dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog());
    dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen());
    dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1);  
    id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1;            // @2
    dLedgerServer = new DLedgerServer(dLedgerConfig);                           // @3
    dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
    DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> {
            assert bodyOffset == DLedgerEntry.BODY_OFFSET;
            buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION);
            buffer.putLong(entry.getPos() + bodyOffset);
    };
    dLedgerFileStore.addAppendHook(appendHook);   // @4
    dLedgerFileList = dLedgerFileStore.getDataFileList();
    this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());   // @5
}

代碼@1:調用父類 即 CommitLog 的構造函數,加載 ${ROCKETMQ_HOME}/store/ comitlog 下的 commitlog 文件,以便兼容升級 DLedger 的消息。我們稍微看一下 CommitLog 的構造函數: RocketMQ中怎么對DLedger進行整合

代碼@2:構建 DLedgerConfig 相關配置屬性,其主要屬性如下:

  • enableDiskForceClean 是否強制刪除文件,取自 broker 配置屬性 cleanFileForciblyEnable,默認為 true 。

  • storeType DLedger 存儲類型,固定為 基于文件的存儲模式。

  • dLegerSelfId leader 節點的 id 名稱,示例配置:n0,其配置要求第二個字符后必須是數字。

  • dLegerGroup DLeger group 的名稱,建議與 broker 配置屬性 brokerName 保持一致。

  • dLegerPeers DLeger Group 中所有的節點信息,其配置示例 n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913。多個節點使用分號隔開。

  • storeBaseDir 設置 DLedger 的日志文件的根目錄,取自 borker 配件文件中的 storePathRootDir ,即 RocketMQ 的數據存儲根路徑。

  • mappedFileSizeForEntryData 設置 DLedger 的單個日志文件的大小,取自 broker 配置文件中的 - mapedFileSizeCommitLog,即與 commitlog 文件的單個文件大小一致。

  • deleteWhen DLedger 日志文件的刪除時間,取自 broker 配置文件中的 deleteWhen,默認為凌晨 4點。

  • fileReservedHours DLedger 日志文件保留時長,取自 broker 配置文件中的 fileReservedHours,默認為 72h。

代碼@3:根據 DLedger 配置信息創建 DLedgerServer,即創建 DLedger 集群節點,集群內各個節點啟動后,就會觸發選主。

代碼@4:構建 appendHook 追加鉤子函數,這是兼容 Commitlog 文件很關鍵的一步,后面會詳細介紹其作用。

代碼@5:構建消息序列化。

根據上述的流程圖,構建好 DefaultMessageStore 實現后,就是調用其 load 方法,在啟用 DLedger 機制后,會依次調用 DLedgerCommitlog 的 load、recover 方法。

3.3 load

public boolean load() {
    boolean result = super.load();
    if (!result) {
        return false;
    }
    return true;
}

DLedgerCommitLog 的 laod 方法實現比較簡單,就是調用 其父類 Commitlog 的 load 方法,即這里也是為了啟用 DLedger 時能夠兼容以前的消息。

3.4 recover

在 Broker 啟動時會加載 commitlog、consumequeue等文件,需要恢復其相關是數據結構,特別是與寫入、刷盤、提交等指針,其具體調用 recover 方法。 DLedgerCommitLog#recover

public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {  // @1
    recover(maxPhyOffsetOfConsumeQueue);
}

首先會先恢復 consumequeue,得出 consumequeue 中記錄的最大有效物理偏移量,然后根據該物理偏移量進行恢復。 接下來看一下該方法的處理流程與關鍵點。

DLedgerCommitLog#recover

dLedgerFileStore.load();

Step1:加載 DLedger 相關的存儲文件,并一一構建對應的 MmapFile,其初始化三個重要的指針 wrotePosition、flushedPosition、committedPosition 三個指針為文件的大小。

DLedgerCommitLog#recover

if (dLedgerFileList.getMappedFiles().size() > 0) {   
    dLedgerFileStore.recover();   // @1
    dividedCommitlogOffset = dLedgerFileList.getFirstMappedFile().getFileFromOffset();     // @2
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    if (mappedFile != null) {                                                                                                       // @3
        disableDeleteDledger();
    }
    long maxPhyOffset = dLedgerFileList.getMaxWrotePosition();
    // Clear ConsumeQueue redundant data
    if (maxPhyOffsetOfConsumeQueue >= maxPhyOffset) {      // @4
        log.warn("[TruncateCQ]maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, maxPhyOffset);
        this.defaultMessageStore.truncateDirtyLogicFiles(maxPhyOffset);
    }
    return;
}

Step2:如果已存在 DLedger 的數據文件,則只需要恢復 DLedger 相關數據文建,因為在加載舊的 commitlog 文件時已經將其重要的數據指針設置為最大值。其關鍵實現點如下:

  • 首先調用 DLedger 文件存儲實現類 DLedgerFileStore 的 recover 方法,恢復管轄的 MMapFile 對象(一個文件對應一個MMapFile實例)的相關指針,其實現方法與 RocketMQ 的 DefaultMessageStore 的恢復過程類似。

  • 設置 dividedCommitlogOffset 的值為 DLedger 中所有物理文件的最小偏移量。操作消息的物理偏移量小于該值,則從 commitlog 文件中查找;物理偏移量大于等于該值的話則從 DLedger 相關的文件中查找消息。

  • 如果存在舊的 commitlog 文件,則禁止刪除 DLedger 文件,其具體做法就是禁止強制刪除文件,并將文件的有效存儲時間設置為 10 年。

  • 如果 consumequeue 中存儲的最大物理偏移量大于 DLedger 中最大的物理偏移量,則刪除多余的 consumequeue 文件。

>溫馨提示:為什么當存在 commitlog 文件的情況下,不能刪除 DLedger 相關的日志文件呢?

因為在此種情況下,如果 DLedger 中的物理文件有刪除,則物理偏移量會斷層。 RocketMQ中怎么對DLedger進行整合

正常情況下, maxCommitlogPhyOffset 與 dividedCommitlogOffset 是連續的,這樣非常方便是訪問 commitlog 還是 訪問 DLedger ,但如果DLedger 部分文件刪除后,這兩個值就變的不連續,就會造成中間的文件空洞,無法被連續訪問。

DLedgerCommitLog#recover

isInrecoveringOldCommitlog = true;
super.recoverNormally(maxPhyOffsetOfConsumeQueue);
isInrecoveringOldCommitlog = false;

Step3:如果啟用了 DLedger 并且是初次啟動(還未生成 DLedger 相關的日志文件),則需要恢復 舊的 commitlog 文件。

DLedgerCommitLog#recover

MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (mappedFile == null) {           // @1
    return;
}
ByteBuffer byteBuffer =  mappedFile.sliceByteBuffer();
byteBuffer.position(mappedFile.getWrotePosition());
boolean needWriteMagicCode = true;
// 1 TOTAL SIZE
byteBuffer.getInt(); //size
int magicCode = byteBuffer.getInt();
if (magicCode == CommitLog.BLANK_MAGIC_CODE) {   // @2
    needWriteMagicCode = false;
} else {
    log.info("Recover old commitlog found a illegal magic code={}", magicCode);
}
dLedgerConfig.setEnableDiskForceClean(false);
dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize();   // @3
log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} dividedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), dividedCommitlogOffset);
if (needWriteMagicCode) {  // @4
    byteBuffer.position(mappedFile.getWrotePosition());
    byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition());
    byteBuffer.putInt(BLANK_MAGIC_CODE);
    mappedFile.flush(0);
}
mappedFile.setWrotePosition(mappedFile.getFileSize());   // @5
mappedFile.setCommittedPosition(mappedFile.getFileSize());
mappedFile.setFlushedPosition(mappedFile.getFileSize());
dLedgerFileList.getLastMappedFile(dividedCommitlogOffset);
log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset);
}

Step4:如果存在舊的 commitlog 文件,需要將最后的文件剩余部分全部填充,即不再接受新的數據寫入,新的數據全部寫入到 DLedger 的數據文件中。其關鍵實現點如下:

  • 嘗試查找最后一個 commitlog 文件,如果未找到,則結束。

  • 從最后一個文件的最后寫入點(原 commitlog 文件的 待寫入位點)嘗試去查找寫入的魔數,如果存在魔數并等于 CommitLog.BLANK_MAGIC_CODE,則無需再寫入魔數,在升級 DLedger 第一次啟動時,魔數為空,故需要寫入魔數。

  • 初始化 dividedCommitlogOffset ,等于最后一個文件的起始偏移量加上文件的大小,即該指針指向最后一個文件的結束位置。

  • 將最后一個 commitlog 未寫滿的數據全部寫入,其方法為 設置消息體的 size 與 魔數即可。

  • 設置最后一個文件的 wrotePosition、flushedPosition、committedPosition 為文件的大小,同樣有意味者最后一個文件已經寫滿,下一條消息將寫入 DLedger 中。

在啟用 DLedger 機制時 Broker 的啟動流程就介紹到這里了,相信大家已經了解 DLedger 在整合 RocketMQ 上做的努力,接下來我們從消息追加、消息讀取兩個方面再來探討 DLedger 是如何無縫整合 RocketMQ 的,實現平滑升級的。

4、從消息追加看 DLedger 整合 RocketMQ 如何實現無縫兼容

> 溫馨提示:本節同樣也不會詳細介紹整個消息追加(存儲流程),只是要點出與 DLedger(多副本、主從切換)相關的核心關鍵點。如果想詳細了解消息追加的流程,可以閱讀筆者所著的《RocketMQ技術內幕》一書。

DLedgerCommitLog#putMessage

AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBody(encodeResult.data);
dledgerFuture = (AppendFuture<appendentryresponse>) dLedgerServer.handleAppend(request);
if (dledgerFuture.getPos() == -1) {
    return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
}

關鍵點一:消息追加時,則不再寫入到原先的 commitlog 文件中,而是調用 DLedgerServer 的 handleAppend 進行消息追加,該方法會有集群內的 Leader 節點負責消息追加以及在消息復制,只有超過集群內的半數節點成功寫入消息后,才會返回寫入成功。如果追加成功,將會返回本次追加成功后的起始偏移量,即 pos 屬性,即類似于 rocketmq 中 commitlog 的偏移量,即物理偏移量。

DLedgerCommitLog#putMessage

long wroteOffset =  dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET;
ByteBuffer buffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, eclipseTimeInLock);

關鍵點二:根據 DLedger 的起始偏移量計算真正的消息的物理偏移量,從開頭部分得知,DLedger 自身有其存儲協議,其 body 字段存儲真實的消息,即 commitlog 條目的存儲結構,返回給客戶端的消息偏移量為 body 字段的開始偏移量,即通過 putMessage 返回的物理偏移量與不使用Dledger 方式返回的物理偏移量的含義是一樣的,即從開偏移量開始,可以正確讀取消息,這樣 DLedger 完美的兼容了 RocketMQ Commitlog。關于 pos 以及 wroteOffset 的圖解如下: RocketMQ中怎么對DLedger進行整合

5、從消息讀取看 DLedger 整合 RocketMQ 如何實現無縫兼容

DLedgerCommitLog#getMessage

public SelectMappedBufferResult getMessage(final long offset, final int size) {
    if (offset &lt; dividedCommitlogOffset) {   // @1
        return super.getMessage(offset, size);
    }
    int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData();
    MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0);   // @2
    if (mappedFile != null) {
        int pos = (int) (offset % mappedFileSize);
        return  convertSbr(mappedFile.selectMappedBuffer(pos, size));                                       // @3
    }
    return null;
}

消息查找比較簡單,因為返回給客戶端消息,轉發給 consumequeue 的消息物理偏移量并不是 DLedger 條目的偏移量,而是真實消息的起始偏移量。其實現關鍵點如下:

  • 如果查找的物理偏移量小于 dividedCommitlogOffset,則從原先的 commitlog 文件中查找。

  • 然后根據物理偏移量按照二分方找到具體的物理文件。

  • 對物理偏移量取模,得出在該物理文件中中的絕對偏移量,進行消息查找即可,因為只有知道其物理偏移量,從該處先將消息的長度讀取出來,然后即可讀出一條完整的消息。

5、總結

根據上面詳細的介紹,我想讀者朋友們應該不難得出如下結論:

  • DLedger 在整合時,使用 DLedger 條目包裹 RocketMQ 中的 commitlog 條目,即在 DLedger 條目的 body 字段來存儲整條 commitlog 條目。

  • 引入 dividedCommitlogOffset 變量,表示物理偏移量小于該值的消息存在于舊的 commitlog 文件中,實現 升級 DLedger 集群后能訪問到舊的數據。

  • 新 DLedger 集群啟動后,會將最后一個 commitlog 填充,即新的數據不會再寫入到 原先的 commitlog 文件。

  • 消息追加到 DLedger 數據日志文件中,返回的偏移量不是 DLedger 條目的起始偏移量,而是DLedger 條目中 body 字段的起始偏移量,即真實消息的起始偏移量,保證消息物理偏移量的語義與 RocketMQ Commitlog一樣。

看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

神池县| 乡城县| 连州市| 安徽省| 新巴尔虎左旗| 德江县| 白河县| 达州市| 田阳县| 乌拉特后旗| 山西省| 巩义市| 吉木乃县| 松溪县| 巍山| 磐石市| 新宁县| 卓尼县| 科技| 武强县| 苏州市| 上林县| 连州市| 新兴县| 资源县| 平潭县| 南安市| 鄂尔多斯市| 岱山县| 壶关县| 张掖市| 临漳县| 揭西县| 巨鹿县| 嘉禾县| 乌兰察布市| 尉犁县| 农安县| 来宾市| 大名县| 长乐市|