亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ存儲文件是怎樣的

發布時間:2021-10-20 17:59:34 來源:億速云 閱讀:315 作者:柒染 欄目:大數據

RocketMQ存儲文件是怎樣的,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

RocketMQ存儲路徑默認是${ROCKRTMQ_HOME}/store,主要存儲消息、主題對應的消息隊列的索引等。

1、概述

查看其目錄文件

RocketMQ存儲文件是怎樣的

  • commitlog:消息的存儲目錄

  • config:運行期間一些配置信息

RocketMQ存儲文件是怎樣的

  • consumequeue:消息消費隊列存儲目錄

  • index:消息索引文件存儲目錄

  • abort:如果存在abort文件說明Broker非正常關閉,該文件默認啟動時創建,正常退出時刪除

  • checkpoint:文件檢測點。存儲commitlog文件最后一次刷盤時間戳、consumequeue最后一次刷盤時間、index索引文件最后一次刷盤時間戳。

2、文件簡介

2.1、commitlog文件

commitlog文件的存儲地址:$HOME\store\commitlog${fileName},每個文件的大小默認1G =102410241024,commitlog的文件名fileName,名字長度為20位,左邊補零,剩余為起始偏移量;比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當這個文件滿了,第二個文件名字為00000000001073741824,起始偏移量為1073741824,以此類推,第三個文件名字為00000000002147483648,起始偏移量為2147483648 ,消息存儲的時候會順序寫入文件,當文件滿了,寫入下一個文件。

RocketMQ存儲文件是怎樣的

commitlog目錄下的文件主要存儲消息,每條消息的長度不同,查看其存儲的邏輯視圖,每條消息的前面4個字節存儲該條消息的總長度。

RocketMQ存儲文件是怎樣的

文件的消息單元存儲詳細信息

編號字段簡稱字段大小(字節)字段含義
1msgSize4代表這個消息的大小
2MAGICCODE4MAGICCODE = daa320a7
3BODY CRC4消息體BODY CRC 當broker重啟recover時會校驗
4queueId4
5flag4
6QUEUEOFFSET8這個值是個自增值不是真正的consume queue的偏移量,可以代表這個consumeQueue隊列或者tranStateTable隊列中消息的個數,若是非事務消息或者commit事務消息,可以通過這個值查找到consumeQueue中數據,QUEUEOFFSET * 20才是偏移地址;若是PREPARED或者Rollback事務,則可以通過該值從tranStateTable中查找數據
7PHYSICALOFFSET8代表消息在commitLog中的物理起始地址偏移量
8SYSFLAG4指明消息是事物事物狀態等消息特征,二進制為四個字節從右往左數:當4個字節均為0(值為0)時表示非事務消息;當第1個字節為1(值為1)時表示表示消息是壓縮的(Compressed);當第2個字節為1(值為2)表示多消息(MultiTags);當第3個字節為1(值為4)時表示prepared消息;當第4個字節為1(值為8)時表示commit消息;當第3/4個字節均為1時(值為12)時表示rollback消息;當第3/4個字節均為0時表示非事務消息
9BORNTIMESTAMP8消息產生端(producer)的時間戳
10BORNHOST8消息產生端(producer)地址(address:port)
11STORETIMESTAMP8消息在broker存儲時間
12STOREHOSTADDRESS8消息存儲到broker的地址(address:port)
13RECONSUMETIMES8消息被某個訂閱組重新消費了幾次(訂閱組之間獨立計數),因為重試消息發送到了topic名字為%retry%groupName的隊列queueId=0的隊列中去了,成功消費一次記錄為0;
14PreparedTransaction Offset8表示是prepared狀態的事物消息
15messagebodyLength4消息體大小值
16messagebodybodyLength消息體內容
17topicLength1topic名稱內容大小
18topictopicLengthtopic的內容值
19propertiesLength2屬性值大小
20propertiespropertiesLengthpropertiesLength大小的屬性數據

2.2、consumequeue

RocketMQ基于主題訂閱模式實現消息的消費,消費者關心的是主題下的所有消息。但是由于不同的主題的消息不連續的存儲在commitlog文件中,如果只是檢索該消息文件可想而知會有多慢,為了提高效率,對應的主題的隊列建立了索引文件,為了加快消息的檢索和節省磁盤空間,每一個consumequeue條目存儲了消息的關鍵信息commitog文件中的偏移量、消息長度、tag的hashcode值。

RocketMQ存儲文件是怎樣的

查看目錄結構:

RocketMQ存儲文件是怎樣的

單個consumequeue文件中默認包含30萬個條目,每個條目20個字節,所以每個文件的大小是固定的20w x 20字節,單個consumequeue文件可認為是一個數組,下標即為邏輯偏移量,消息的消費進度存儲的偏移量即邏輯偏移量。

2.3、IndexFile

IndexFile:用于為生成的索引文件提供訪問服務,通過消息Key值查詢消息真正的實體內容。在實際的物理存儲上,文件名則是以創建時的時間戳命名的,固定的單個IndexFile文件大小約為400M,一個IndexFile可以保存 2000W個索引;

2.3.1、IndexFile結構分析

RocketMQ存儲文件是怎樣的

IndexHead 數據: beginTimestamp:該索引文件包含消息的最小存儲時間 endTimestamp:該索引文件包含消息的最大存儲時間 beginPhyoffset:該索引文件中包含消息的最小物理偏移量(commitlog 文件偏移量) endPhyoffset:該索引文件中包含消息的最大物理偏移量(commitlog 文件偏移量) hashSlotCount:hashslot個數,并不是 hash 槽使用的個數,在這里意義不大, indexCount:已使用的 Index 條目個數

Hash 槽: 一個 IndexFile 默認包含 500W 個 Hash 槽,每個 Hash 槽存儲的是落在該 Hash 槽的 hashcode 最新的 Index 的索引

Index 條目列表 hashcode:key 的 hashcode phyoffset:消息對應的物理偏移量 timedif:該消息存儲時間與第一條消息的時間戳的差值,小于 0 表示該消息無效 preIndexNo:該條目的前一條記錄的 Index 索引,hash 沖突時,根據該值構建鏈表結構

2.3.2、IndexFile條目存儲

RocketMQ將消息索引鍵與消息的偏移量映射關系寫入IndexFile中,其核心的實現方法是public boolean putKey(final String key, final long phyOffset, final long storeTimestamp);參數含義分別是消息的索引、消息的物理偏移量、消息的存儲時間。

    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    	//判斷當前的條目數是否大于最大的允許的條目數
        if (this.indexHeader.getIndexCount() < this.indexNum) {
        	//獲取KEY的hash值(正整數)
            int keyHash = indexKeyHashMethod(key);
            //計算hash槽的下標
            int slotPos = keyHash % this.hashSlotNum;
            //獲取hash槽的物理地址
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
            FileLock fileLock = null;
            try {
                // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                // false);
            	//獲取hash槽中存儲的數據
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                //判斷值是否小于等于0或者 大于當前索引文件的最大條目
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }
                //計算當前消息存儲時間與第一條消息時間戳的時間差
                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
                //秒
                timeDiff = timeDiff / 1000;
                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }
                //計算條目的物理地址  = 索引頭部大小(40字節) + hash槽的大小(4字節)*槽的數量(500w) + 當前索引最大條目的個數*每index的大小(20字節)
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;
                //依次存入 key的hash值(4字節)+消息的物理偏移量(8字節)+消息存儲時間戳和index文件的時間戳差(4字節)+當前hash槽的值(4字節)
                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
                //存儲當前index中包含的條目數量存入hash槽中,覆蓋原先hash槽的值
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
                
                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }
                //更新文件索引的頭信息,hash槽的總數、index條目的總數、最后消息的物理偏移量、最后消息的存儲時間
                this.indexHeader.incHashSlotCount();
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);

                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }
            }
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
        }
        return false;
    }

以上詳細了分析了IndexFile條目存儲的業務邏輯

2.3.3、通過KEY查找消息

RocketMQ存儲文件是怎樣的

DefaultMessageStore類中的public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) 中其核心方法是QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);獲取消息的物理存儲地址,通過偏移量去commitLog中獲取消息集。

public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end)核心方法又是IndexFile類中的public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock)方法

public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
	final long begin, final long end, boolean lock) {
	if (this.mappedFile.hold()) {
		//獲取key的hash信息
		int keyHash = indexKeyHashMethod(key);
		//獲取hash槽的下標
		int slotPos = keyHash % this.hashSlotNum;
		//獲取hash槽的物理地址
		int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
		FileLock fileLock = null;
		try {
			if (lock) {
				// fileLock = this.fileChannel.lock(absSlotPos,
				// hashSlotSize, true);
			}
			//獲取hash槽的值
			int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
			// if (fileLock != null) {
			// fileLock.release();
			// fileLock = null;
			// }
			//判斷值是否小于等于0或者 大于當前索引文件的最大條目
			if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
				|| this.indexHeader.getIndexCount() <= 1) {
			} else {
				for (int nextIndexToRead = slotValue; ; ) {
					if (phyOffsets.size() >= maxNum) {
						break;
					}
					//計算條目的物理地址  = 索引頭部大小(40字節) + hash槽的大小(4字節)*槽的數量(500w) + 當前索引最大條目的個數*每index的大小(20字節)
					int absIndexPos =
						IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
							+ nextIndexToRead * indexSize;
					//獲取key的hash值
					int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
					//獲取消息的物理偏移量
					long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
					//獲取當前消息的存儲時間戳與index文件的時間戳差值
					long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
					//獲取前一個條目的信息(鏈表結構)
					int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
					if (timeDiff < 0) {
						break;
					}
					timeDiff *= 1000L;
					long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
					//判斷該消息是否在查詢的區間
					boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
					//判斷key的hash值是否相等并且在查詢的時間區間內
					if (keyHash == keyHashRead && timeMatched) {
						//加入到物理偏移量的List中
						phyOffsets.add(phyOffsetRead);
					}
					if (prevIndexRead <= invalidIndex
						|| prevIndexRead > this.indexHeader.getIndexCount()
						|| prevIndexRead == nextIndexToRead || timeRead < begin) {
						break;
					}
					//繼續前一個條目信息獲取進行匹配
					nextIndexToRead = prevIndexRead;
				}
			}
		} catch (Exception e) {
			log.error("selectPhyOffset exception ", e);
		} finally {
			if (fileLock != null) {
				try {
					fileLock.release();
				} catch (IOException e) {
					log.error("Failed to release the lock", e);
				}
			}
			this.mappedFile.release();
		}
	}
}
  1. 根據查詢的 key 的 hashcode%slotNum 得到具體的槽的位置( slotNum 是一個索引文件里面包含的最大槽的數目,例如圖中所示 slotNum=5000000)。

  2. 根據 slotValue( slot 位置對應的值)查找到索引項列表的最后一項(倒序排列, slotValue 總是指向最新的一個 索引項)。

  3. 遍歷索引項列表返回查詢時間范圍內的結果集(默認一次最大返回的 32 條記彔)

  4. Hash 沖突;尋找 key 的 slot 位置時相當于執行了兩次散列函數,一次 key 的 hash,一次 key 的 hash 值取模,因此返里存在兩次沖突的情況;第一種, key 的 hash 不同但模數相同,此時查詢的時候會在比較一次key 的hash 值(每個索引項保存了 key 的 hash 值),過濾掉 hash 值不相等的項。第二種, hash 值相等但 key 不等,出于性能的考慮沖突的檢測放到客戶端處理( key 的原始值是存儲在消息文件中的,避免對數據文件的解析),客戶端比較一次消息體的 key 是否相同

2.4、checkpoint

checkpoint文件的作用是記錄commitlog、consumequeue、index文件的刷盤時間點,文件固定長度4k,其中只用了該文件的前24個字節。查看其存儲格式

RocketMQ存儲文件是怎樣的

physicMsgTimestamp:commitlog文件刷盤時間點

logicsMsgTimestamp:消息的消費隊列文件刷盤時間點

indexMsgTimestamp:索引文件刷盤時間點

關于RocketMQ存儲文件是怎樣的問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

海宁市| 漯河市| 临城县| 平凉市| 墨竹工卡县| 遂平县| 肇源县| 霍邱县| 习水县| 蒙阴县| 藁城市| 凌云县| 阿克苏市| 湖南省| 临夏县| 黄山市| 普兰县| 佛教| 宜兴市| 昌乐县| 宁陕县| 高雄县| 陇南市| 化隆| 新邵县| 托克逊县| 大关县| 确山县| 徐闻县| 锡林浩特市| 肃南| 长宁区| 金塔县| 全州县| 建昌县| 韩城市| 福鼎市| 莎车县| 荔浦县| 昌平区| 奉新县|