亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ源碼中如何實現注冊服務器

發布時間:2021-12-17 16:13:25 來源:億速云 閱讀:155 作者:小新 欄目:云計算

這篇文章給大家分享的是有關RocketMQ源碼中如何實現注冊服務器的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

NamesrvStartup

該類用于啟動注冊服務器。其main方法委托了main0方法,該方法的執行邏輯如下:

  1. 調用方法NamesrvStartup#createNamesrvController創建一個NamesrvController實例,聲明為controller

  2. 調用方法NamesrvStartup#start將這個controller啟動。

那么下面就分別來看下兩個方法的具體內容。

createNamesrvController

這個方法最重要的就是用構造方法創建了NamesrvController對象。而在調用構造方法之前有較多的代碼是用于解析命令行對象,以及可能的情況下讀取文件中的配置信息、打印當前的整體配置信息。

這些額外配置不存在的時候,默認配置下,注冊服務器是監聽于9876端口。

start

該方法的作用是啟動入參的NamesrvController實例。具體來說,流程如下:

  1. 執行方法NamesrvController#initialize進行初始化。

  2. 為運行時添加一個hook,在JVM關閉的時候,執行方法NamesrvController#shutdown對注冊服務器執行優雅關閉。

  3. 執行方法NamesrvController#start啟動注冊服務器。

NamesrvController

這個類用于控制注冊服務器。

構造方法

構造方法中主要是為了幾個重要屬性進行賦值操作。比如初始化kvConfigManagerrouteInfoManager這兩個重要的屬性。

initialize

該方法用于初始化注冊服務器,執行邏輯如下:

  1. 執行方法kvconfig.KVConfigManager#load加載配置信息。默認情況下,加載 ${user.home}/namesrv/kvConfig.json 文件的內容到屬性kvconfig.KVConfigManager#configTable中。

  2. 新建一個NettyRemotingServer對象,為屬性NamesrvController#remotingServer賦值。這個新建的對象,使用了BrokerHousekeepingService作為入參。該BrokerHousekeepingService的作用就是在發生通道關閉、異常、空閑等情況時,將該通道從路由信息里刪除。

  3. 創建一個線程池,賦值給屬性NamesrvController#remotingExecutor,用于注冊服務器在Netty中的業務執行。

  4. 調用方法NamesrvController#registerProcessor將業務處理器注冊到RemotingServer中。使用的線程池就是步驟3創建的線程池。

  5. 創建一個間隔時間為10秒的周期性任務,任務內容是調用方法RouteInfoManager#scanNotActiveBroker掃描非激活模式的Broker

start

該方法沒有更多內容,只是簡單了啟動了RemotingServer。在這個方法之后,就可以開始監聽Broker上送的注冊請求。

KVConfigManager

該類是注冊服務器的配置存儲類。會將配置信息存儲在文件 ${user.home}/namesrv/kvConfig.json 。內部用來存儲配置信息的是一個HashMap<String, HashMap<String, String>> 結構,也就是兩級結構。

第一級是命名空間,第二集是KV對,都是字符串形式。

該類的load方法可以從文件中加載數據到內存里,persist方法可以將內存中的數據再寫入到文件中。

DefaultRequestProcessor

這個類是 rocketmq-namesrv 這個包下面,代碼量最多的類了。因為業務處理都實現在了這個類上面。

按照NettyRequestProcessor接口的實現套路,業務請求的分流都是在processRequest方法中,這里也是,接下來就一個個看這個類支持的命令。

PUT_KV_CONFIG

該命令沒有請求體,請求頭中有namespacekeyvalue字段,調用方法kvconfig.KVConfigManager#putKVConfig將配置項放入到配置管理器中即可。

GET_KV_CONFIG

該命令沒有請求體,請求頭中有namespacekey字段,調用方法kvconfig.KVConfigManager#getKVConfig獲取對應配置項。

如果配置項存在,返回成功響應。如果配置信息不存在,返回失敗響應,響應碼為QUERY_NOT_FOUND

DELETE_KV_CONFIG

該命令沒有請求體,請求頭中有namespacekey字段,調用方法kvconfig.KVConfigManager#deleteKVConfig刪除對應配置項。

QUERY_DATA_VERSION

該命令用于查詢注冊服務器上Broker的數據版本號。具體執行邏輯如下:

  1. 從命令的內容體解析出DataVersion對象,從請求頭中解析出BrokerAddr數據。使用這兩個作為入參,調用方法RouteInfoManager#isBrokerTopicConfigChanged判斷與服務器上該BrokerAddr的版本號是否一致,將結果聲明為changed

  2. 如果changedfalse,表明版本號沒有變化,那么服務器上的數據在當前時間還是有效的,調用方法RouteInfoManager#updateBrokerInfoUpdateTimestamp更新這個數據的有效時間。

  3. 調用方法RouteInfoManager#queryBrokerTopicConfig查詢服務器上BrokerAddr對應的版本號,聲明為nameSeverDataVersion

  4. 構建命令響應對象,如果nameSeverDataVersion不為null,則編碼后設置到內容體。在響應頭中設置changed屬性,值為步驟1產生的聲明對象。

REGISTER_BROKER

該命令用于Broker信息的注冊。首先獲取請求頭中MQ的版本號,如果版本號大于等于3.0.11,則調用方法processor.DefaultRequestProcessor#registerBrokerWithFilterServer進行信息注冊;否則調用方法processor.DefaultRequestProcessor#registerBroker進行信息注冊。

registerBrokerWithFilterServer

方法的執行邏輯如下:

  1. 對請求命令進行解碼工作,創建出RegisterBrokerRequestHeader對象。使用該對象對象和請求中的body字段執行crc校驗,如果校驗失敗,返回系統錯誤響應。否則,繼續后續流程。

  2. 如果命令請求對象中包含內容體,則解碼出RegisterBrokerBody對象,聲明為registerBrokerBody。如果命令請求對象不包含內容體,則手動創建RegisterBrokerBody對象,并且將其DataVersion的版本號設置為0,時間戳設置為0.

  3. 調用方法RouteInfoManager#registerBroker注冊路由信息,將結果聲明為result

  4. 創建類型為RegisterBrokerResponseHeader的響應頭對象,聲明為responseHeader。將resultmasterAddrHaServerAddr屬性設置到響應頭對象中。

  5. 從配置管理器中以ORDER_TOPIC_CONFIG作為命名空間,取出該命名空間下面的配置數據對象,編碼后將二進制設置為響應的內容體。

  6. 返回響應對象。

registerBroker

registerBrokerWithFilterServer方法的流程基本一致,只不過在調用方法RouteInfoManager#registerBroker的時候,入參的filterServerList為null。

UNREGISTER_BROKER

該命令用于注銷 Broker 的注冊。調用方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#unregisterBroker完成,而該方法內部則是委托給了方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unregisterBroker

GET_ROUTEINFO_BY_TOPIC

該命名用于查詢主題的路由信息,調用了方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic

該方法用于在路由管理器中根據主題名稱獲取全量的路由信息,具體流程如下:

  1. 使用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#pickupTopicRouteData根據請求的主題名稱得到類型為TopicRouteData的結果,聲明為topicRouteData

  2. 如果topicRouteData不為null,則執行如下子流程。

    1. 如果配置org.apache.rocketmq.common.namesrv.NamesrvConfig#orderMessageEnable開啟,則從命名空間ORDER_TOPIC_CONFIG下面,獲取入參主題名稱的配置信息,聲明為orderTopicConf。將orderTopicConf設置到屬性org.apache.rocketmq.common.protocol.route.TopicRouteData#orderTopicConf

    2. topicRouteData進行編碼,設置為響應的內容體,返回響應對象。

  3. 如果topicRouteData為null,則返回TOPIC_NOT_EXIST響應。

GET_BROKER_CLUSTER_INFO

該命令調用了方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getBrokerClusterInfo。該方法的邏輯就是調用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllClusterInfo得到一個編碼后的內容體,將這個內容體設置為響應的內容體,返回響應對象即可。

編碼的內容體數據結構類是ClusterInfo,其屬性如下

HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;

WIPE_WRITE_PERM_OF_BROKER

該命令用于擦除Broker的寫權限,也就說所有在該Broker上的主題都沒有寫入權限了。調用了方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#wipeWritePermOfBroker實現,該方法的邏輯如下:

  1. 調用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#wipeWritePermOfBrokerByLock擦除入參Broker的寫權限,方法的返回值為擦除的隊列信息個數。將結果聲明為wipeTopicCnt

  2. wipeTopicCnt設置到響應頭的對應屬性,返回響應。

GET_ALL_TOPIC_LIST_FROM_NAMESERVER

該命令用于獲取注冊服務器上全量的主題信息,調用方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getAllTopicListFromNameserver實現。

該方法內部調用方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllTopicList獲取所有的主題名稱形成的列表,并且編碼為二進制數組,設置為響應的內容體,將響應返回。

DELETE_TOPIC_IN_NAMESRV

該命令用于刪除服務器上的主題信息,通過方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#deleteTopicInNamesrv實現。方法實現也簡單,直接從topicQueueTable中刪除對應的主題名稱即可。

GET_KVLIST_BY_NAMESPACE

該命令用于獲取服務器上特定命名空間下的配置信息。通過方法org.apache.rocketmq.namesrv.kvconfig.KVConfigManager#getKVListByNamespace獲取到對應的配置信息,并且編碼為二進制數組。

如果數組存在,則設置到響應的內容體中,返回成功響應。

如果數組不存在,則返回QUERY_NOT_FOUND響應。

GET_TOPICS_BY_CLUSTER

該命令用于獲取集群下所有的主題名稱,調用方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getTopicsByCluster完成。該方法內部調用org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getTopicsByCluster獲取集群下的所有主題名稱的編碼結果,將編碼結果的二進制數組設置到響應的內容體中,返回成功響應。

GET_SYSTEM_TOPIC_LIST_FROM_NS

這個命令有點奇怪,看命令名稱是獲取系統主題列表。但是從方法實現上,內部的內容整體是混亂的。這個命令暫且放下,等看到相關聯的請求查詢的時候在處理。

GET_UNIT_TOPIC_LIST

該命令用于獲取集群下,有unit標識的主題名稱集合。通過方法org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getUnitTopicList實現,該方法內部調用了方法org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getUnitTopics來返回具備unit標識的主題名稱集合的編碼后二進制數組。將這個數組設置為響應的內容體,并且返回。

GET_HAS_UNIT_SUB_TOPIC_LIST

該命令用于獲取集群下,有unit_sub標識的主題名稱集合。做法上與GET_UNIT_TOPIC_LIST命令是相同的,只不過用的標識不同。

GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST

該命令用于獲取集群下,同時有unitunit_sub標識的主題名稱集合。做法上與上述的一致,只不過用的標識不同。

UPDATE_NAMESRV_CONFIG

這個命令是用于管理端直接發送配置的文本到注冊服務,用于更新注冊服務自身的配置,而后將配置信息持久化到磁盤文件。

GET_NAMESRV_CONFIG

這個命令用于獲取注冊服務的配置信息,將配置信息設置到響應的內容體中。

RouteInfoManager

該類是路由信息的管理器,其中使用了多個類來抽象各種路由信息。下面先看下這些定義類。

QueueData

該類保存了Broker中的隊列信息。有如下屬性:

  • brokerName,Broker的名稱,默認情況下是Broker所在機器的域名,可以由配置定義。

  • readQueueNums,用于讀取的隊列數量。

  • writeQueueNums,用于寫入的隊列數量。

  • perm,該Broker的權限信息,權限指的是是否可讀、是否可寫。

  • topicSynFlag,主題同步標識。

BrokerData

該類保存了Broker集群的地址信息,有如下屬性:

  • cluster,集群標識。

  • brokerName,Broker名稱。

  • brokerAddrs,brokerId和BrokerAddr的映射表。該屬性存儲了同一個Broker名稱下id和地址的映射關系。

BrokerLiveInfo

該類保存了具體某個Broker的存活信息,有如下屬性

  • lastUpdateTimestamp,最近一次數據更新時間。

  • dataVersion,該Broker的主題配置信息的版本號。

  • channel,Netty的Channel對象,該對象即是Broker與服務器之間的鏈接對象。

  • haServerAddr,高可用主節點地址。格式為${ip}:${port} 。

存儲屬性

RouteInfoManager內部管理著5個Map結構,用于存儲路由相關信息,這些信息用代碼來看會更清晰一些,如下:

HashMap<String/* topic */, List<QueueData>> topicQueueTable;
HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

registerBroker

該方法用于實現Broker信息注冊到路由管理器上,具體方法流程如下:

  1. clusterAddrTable中以入參的clusterName獲取集群下所有Broker的名稱,聲明為brokerNames

  2. 如果brokerNames為null,則為其賦值一個空的HashSet<String>。并且在clusterAddrTable放入這個clusterNamebrokerNames兩個值。

  3. brokerNames中添加本次注冊上來的Broker的名稱。

  4. brokerAddrTablebrokerName獲取BrokerData對象,如果不存在則新建一個并且放入到brokerAddrTable中。

  5. 取出步驟4中brokerData中的brokerAddrs映射,遍歷其中的元素,如果值與入參的brokerAddr相等,鍵與入參的brokerId不等,則刪除這個這一鍵值對。這種情況說明此時該IP對應的Broker信息已經發生了變化。

  6. 將入參的brokerIdbrokerAddr放入到brokerAddrs中。

  7. 如果brokerId為0也就是主節點,并且入參的topicConfigWrapper不為null,也就是說Broker發送的注冊命令是包含了請求體,那么執行子流程。否則繼續后續流程。

    1. brokerLiveTable查詢該broker的版本號,與topicConfigWrapper的版本號對比,確認是否有變化。如果有變化,或者該Broker是新注冊的(brokerName第一次注冊或者brokerId第一次注冊),那么就很有可能本次攜帶了新的主題配置信息。則需要更更新注冊服務器上主題配置信息。也就執行后續流程。否則結束子流程,繼續執行步驟8.

    2. 遍歷屬性TopicConfigSerializeWrapper#topicConfigTable,對集合中每一個元素調用方法RouteInfoManager#createAndUpdateQueueData,更新主題對應的隊列信息。

  8. 構建BrokerLiveInfo對象,放入brokerLiveTable中。

  9. 如果入參的filterServerList不為null,則放入filterServerTable

  10. 如果brokerId不為0,也就是當前是從節點在注冊自己,則從brokerAddrs獲取主節點的地址。如果主節點地址存在,則進一步獲取其 HaServer 地址。將這兩個數據設置到返回的結果對象result中。

  11. 返回結果對象result。從代碼可以看出,如果當前注冊不是從節點,或者對應的主節點不存在,則result是一個空對象。

createAndUpdateQueueData

該方法是用于創建或更新 topicQueueTable 中的QueueData對象的。具體流程如下:

  1. 構建一個QueuData對象,里面的屬性來自brokerName、topicConfig 對象。

  2. 從 topicQueueTable 中獲取topicConfig 主題對應的 queueDataList 對象。

  3. 如果 queueDataList 不存在,意味著該主題是第一次出現在注冊服務器中。構建一個新的linkedList對象,添加queueData對象到其中,并且將queueDataList放入到topicQueueTable中。流程結束。

  4. 如果queueDataList存在,則對其元素遍歷,執行如下子操作。

    1. 元素的brokerName屬性與入參的brokerName值相同,則繼續執行后續流程,否則進入下一次循環迭代。

    2. 判斷元素與步驟1構建的對象是否相同,如果相同,不做操作;如果不同,意味著數據有變化,將元素從集合中刪除。

  5. 如果步驟4中有元素被刪除,則將步驟1的對象,添加到queueDataList中。

unregisterBroker

該方法用于在刪除路由管理器中某一個Broker的信息。具體流程如下:

  1. brokerLiveTable中刪除該Broker信息。

  2. filterServerTable刪除該Broker的信息。

  3. 聲明一個局部變量removeBrokerName。從brokerAddrTable獲取該BrokerName對應的brokerData。如果其不為空,則執行子流程。

    1. brokerDatabrokerAddrs刪除該brokerId對應的映射。

    2. 如果brokerAddrs集合為空,則從brokerAddrTable刪除該brokerName對應的映射。為removeBrokerName賦值true

  4. 如果removeBrokerName為真,則執行子流程,否則流程結束。

    1. clusterAddrTable獲取該clusterName對應的brokerName的集合,聲明為nameSet

    2. nameSet不為null的情況下,從nameSet刪除本次的brokerName。如果刪除后nameSet為空,則從clusterAddrTable刪除該brokerName的映射。

    3. 調用方法removeTopicByBrokerName刪除brokerName對應的主題信息。

removeTopicByBrokerName

該方法用于刪除brokerName對應的主題配置信息,具體執行邏輯如下:

  1. 遍歷topicQueueTable,為每一個元素執行后續邏輯。

  2. 針對每一個元素,取出其QueueData列表,遍歷該對象。執行子流程。

    1. 遍歷QueueData列表,如果元素QueueDatabrokerName與入參brokerName相同,則從列表中刪除該元素。

    2. 遍歷完畢后,如果列表為空,則從topicQueueTable中刪除該映射。

pickupTopicRouteData

首先來看下數據結構對象TopicRouteData的定義,其屬性如下

String orderTopicConf;
List<QueueData> queueDatas;
List<BrokerData> brokerDatas;
HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable

從數據結構對象也可以簡單的推測出pickupTopicRouteData方法的實現邏輯。大致來說分為幾個步驟:

  1. topicQueueTable按照主題名稱查詢queueDatas

  2. 根據queueDatas中每一個元素QueueDatabrokerName屬性從brokerAddrTable取得brokerData對象,組成成一個List,也就是brokerDatas

  3. 根據步驟2的brokerDatasfilterServerTable查詢到對應的filterServer列表,組裝為映射。

  4. 將步驟1到3的值組裝為TopicRouteData對象返回給調用者。

wipeWritePermOfBrokerByLock

該方法會以可中斷的方式獲取寫鎖,獲取成功后調用方法wipeWritePermOfBroker。如果獲取失敗則返回0,獲取成功則執行方法wipeWritePermOfBroker執行擦除工作。

wipeWritePermOfBroker方法的內容也很簡單,遍歷topicQueueTable,針對每一個元素,在遍歷其QueueData,如果brokerName與入參的brokerName相同就意味著找到對應的QueueData。將這個里面的perm屬性重新設置值,去掉代表寫權限的標志位即可。

getUnitTopics

該方法用于獲取具備unit標識的主題名稱集合。具體流程如下:

  1. 以可中斷的方式獲取讀鎖。遍歷topicQueueTable元素。

  2. 如果鍵值對中的QueueData列表的首個元素的topicSynFlag屬性值包含了unit標識,將這個鍵值對的key,也即是主題名稱加入到臨時集合中。

  3. 遍歷完后后,返回臨時集合編碼的二進制數組。

onChannelDestroy

當一個Broker的通道關閉的時候,會觸發到這個方法。這個方法的代碼雖然比較多,但是方法思路很簡單,首先通過Channel在brokerLiveTable中找到對應的BrokerLiveInfo對象。并且依靠這個對象的信息,在路由管理器中刪除所有相關的信息接口。

感謝各位的閱讀!關于“RocketMQ源碼中如何實現注冊服務器”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

洞口县| 南雄市| 贺州市| 鄂托克旗| 马边| 洞口县| 武清区| 长治市| 陆丰市| 仪陇县| 呼玛县| 息烽县| 绥棱县| 江达县| 老河口市| 巨鹿县| 土默特右旗| 华安县| 祥云县| 奈曼旗| 南京市| 吴江市| 侯马市| 泰州市| 项城市| 沂南县| 进贤县| 温泉县| 巴塘县| 温州市| 双城市| 黎城县| 光山县| 广平县| 和平县| 海晏县| 安多县| 五常市| 广东省| 和田市| 吴桥县|