亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ存儲中如何實現同步刷盤和異步刷盤

發布時間:2021-11-18 09:36:31 來源:億速云 閱讀:421 作者:小新 欄目:大數據

這篇文章將為大家詳細講解有關RocketMQ存儲中如何實現同步刷盤和異步刷盤,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

一、問題思考

1.同步刷盤是怎么工作的?
2.異步刷盤是怎么工作的?
3.上篇文章的疑問,寫入堆外內存的消息如何落盤的?

二、Broker啟動刷盤有關調用鏈
1.調用鏈

//初始化鏈條
@1 BrokerStartup#main
start(createBrokerController(args));
@2 BrokerStartup#createBrokerController
final BrokerController controller = new BrokerController(...)
boolean initResult = controller.initialize();
@3 BrokerController#initialize
this.messageStore = new DefaultMessageStore(...);
@4 DefaultMessageStore#DefaultMessageStore()
this.commitLog = new CommitLog(this);
@5 CommitLog#CommitLog()
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig()
.getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
this.flushCommitLogService = new FlushRealTimeService();
}
this.commitLogService = new CommitRealTimeService();
//啟動鏈條
@6 BrokerStartup#start
controller.start();
@7 BrokerController#start()
this.messageStore.start();
@8 DefaultMessageStore#start()
this.commitLog.start();
@9 CommitLog#start()
this.flushCommitLogService.start();
if (defaultMessageStore.getMessageStoreConfig()
.isTransientStorePoolEnable()) {
this.commitLogService.start();
}

小結:由調用鏈可以看出,初始化并啟動了以下線程類

1.同步刷盤 GroupCommitService

2.異步刷盤 FlushRealTimeService

3.如果開啟堆外內存并且為異步刷盤 CommitRealTimeService


2.線程類關系圖

RocketMQ存儲中如何實現同步刷盤和異步刷盤

三、線程類工作流程

既然線程類在Broker啟動時就啟動了,他們在做啥呢?

1.堆外內存線程類CommitRealTimeService工作流程

RocketMQ存儲中如何實現同步刷盤和異步刷盤


小結:
1.CommitRealTimeService主要工作是將寫入堆外內存(writeBuffer)的消息,寫入到fileChannel中,fileChannel為commitLog文件通道

2.committedPosition用于記錄將writeBuffer數據寫入到fileChannel中的內存位點(相對偏移量offset)
3.committedWhere用于記錄寫入fileChannel中的物理偏移量(文件名稱+相對偏移量offset)

2.同步刷盤線程類GroupCommitService工作流程

RocketMQ存儲中如何實現同步刷盤和異步刷盤

注1:

1.執行onWaitEnd時交換讀寫容器,該線程類提供兩個容器來裝GroupCommitRequest

2.requestsWrite和requestsRead,每次執行提交(刷盤)前都會進行容器交換

3.好處:讀寫請求容器分離,避免潛在的鎖競爭

private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}

注2:

1.flushedPosition 標記已經刷盤內存的位點。即刷盤相對偏移量,刷盤到什么位置了,下次從此處刷盤即可

2.flushedWhere 標記已經刷盤的物理偏移量,根據此位置可精確查找到文件中消息的存儲位置。flushedWhere = 當前刷盤文件名稱(該日志文件的起始物理偏移量) + flushedPosition

注3:流程圖中標記紅色部分,將刷盤結果通知給等待線程

小結:同步刷盤線程類GroupCommitService主要工作
將請求從讀容器中取出并通過mappedByteBuffer.force()將數據落盤。

3.異步刷盤線程類FlushRealTimeService工作流程

RocketMQ存儲中如何實現同步刷盤和異步刷盤

小結:FlushRealTimeService主要工作
1.不開啟堆外外內存刷盤方式為mappedByteBuffer.force()
2.開啟堆外內存刷盤方式為fileChannel.force


疑問:同步刷盤線程類GroupCommitService每執行一次都會交換讀寫容器,那刷盤請求什么時候放到寫容器(requestsWrite)呢?


四、消息追加與線程類的交互

分析完線程類后,把鏡頭切換到消息追加,看看消息進來后是如何跟線程類交互的?


1.調用鏈

@1 CommitLog#putMessage
//同步刷盤或者異步刷盤
handleDiskFlush(result, putMessageResult, msg);
@2 CommitLog#handleDiskFlush

2.同步刷盤主要代碼

同步刷盤時構造刷盤請求,將請求提交給線程類GroupCommitService,service.putRequest(request),并獲取刷盤結果。

if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
//等待MappedFile刷盤成功狀態通過countDownLatch來控制
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
}
}

3.異步刷盤主要代碼

未開啟堆外內存喚醒FlushRealTimeServicee,開啟堆外內存喚醒CommitRealTimeService。

if (!this.defaultMessageStore.getMessageStoreConfig()
.isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}

五、刷盤方式示意圖
1.同步刷盤示意圖

RocketMQ存儲中如何實現同步刷盤和異步刷盤


2.異步刷盤未開啟堆外緩存示意圖

RocketMQ存儲中如何實現同步刷盤和異步刷盤


3.異步刷盤開啟堆外緩存示意圖

RocketMQ存儲中如何實現同步刷盤和異步刷盤

關于“RocketMQ存儲中如何實現同步刷盤和異步刷盤”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

屏边| 清水县| 元江| 色达县| 浦县| 太湖县| 疏附县| 建德市| 通州区| 九寨沟县| 佳木斯市| 阿克陶县| 开封市| 合山市| 沽源县| 阜宁县| 宜丰县| 韶山市| 禹州市| 云梦县| 海林市| 辉南县| 东光县| 贡觉县| 资阳市| 广昌县| 藁城市| 临潭县| 南昌市| 东丽区| 安仁县| 饶河县| 邛崃市| 江山市| 独山县| 北辰区| 江西省| 海南省| 阿克陶县| 怀宁县| 黄平县|