亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

一次KAFKA消費者異常引起的思考

發布時間:2020-07-18 22:37:45 來源:網絡 閱讀:3150 作者:zhanjia 欄目:大數據
問題描述:

線上出現一臺服務器特別慢,于是關閉了服務器上的kafka broker. 關閉后發現一些kafka consumer無法正常消費數據了, 日志錯誤:
o.a.kakfa.clients.consumer.internals.AbstractCordinator Marking the coordinator (39.0.2.100) as dead.

原因:

經過一番排查,發現consumer group信息:
(kafka.coordinator.GroupMetadataMessageFormatter類型):
groupId::[groupId,Some(consumer),groupState,Map(memberId -> [memberId,clientId,clientHost,sessionTimeoutMs], ...->[]...)],
存到了KAFKA內部topic: __consumer_offsets里, , 它的key是 groupId.
同時發現broker 參數 offsets.topic.replication.factor 錯誤地被設置為1. 這個參數表示TOPIC: __Consumer_offsets 的副本數. 這樣一旦某個broker被關閉, 如果被關閉的Broker 是__Consumer_offsets的某些partition的Leader. 則導致某些consumer group 不可用. 如果一旦broker已經啟動, 需要手工通過命令行來擴展副本數.

reassignment.json:
{"version":1,
 "partitions": [{"topic": "xxx", "partition": 0, "replicas": {brokerId1, brokerId2}}]
}
kafka-reassign-partitions  --zookeeper localhost:2818 --reassignment-json-file  reassignment.json --execute

客戶端尋找Consumer Coordinator的過程:
客戶端 org.apache.kafka.clients.consumer.internals.AbstractCoordinator
如果Coordinator 未知 (AbstractCoordinator.coordinatorUnknown()), 發起請求 lookupCoordinator,向負載最低的節點發送FindCoordinatorRequest

服務端 KafkaApis.handleFindCoordinatorRequest 接收請求:
首先調用 GroupMetaManager.partitionFor(consumerGroupId) consunerGroupId 的 hashCode 對 __consumer_offsets 總的分片數取模獲取partition id 再從 __consumer_offset 這個Topic 中找到partition對應的 Partition Metadata, 并且獲取對應的Partition leader 返回給客戶端

引伸思考

KAFKA 的failover機制究竟是怎么樣的?假使 __consumer_offset 設置了正確的副本數,重選舉的過程是怎樣的. 如果broker宕機后導致某些副本不可用, 副本會自動遷移到其他節點嗎?帶著這些問題稍微閱讀了一下KAFKA的相關代碼:

當一個Broker 被關掉時, 會有兩步操作:
KafkaController.onBrokerFailure ->KafkaController.onReplicasBecomeOffline
主要是通過 PartitionStateMachine.handleStateChanges 方法通知Partition狀態機將狀態置為offline. ReplicaStateMachine.handleStateChanges方法會將Replica 狀態修改為OfflineReplica, 同時修改partition ISR. 如果被關閉broker 是partition leader 那么需要重新觸發partition leader 選舉,最后發送LeaderAndIsrRequest獲取最新的Leader ISR 信息.
KafkaController.unregisterBrokerModificationsHandler 取消注冊的BrokerModificationsHandler 并取消zookeeper 中broker 事件的監聽.

當ISR請求被發出,KafkaApis.handleLeaderAndIsrRequest() 會被調用. 這里如果需要變更leader的partition是屬于__consumer_offset這個特殊的topic,取決于當前的broker節點是不是partition leader. 會分別調用GroupCoordinator.handleGroupImmigrationGroupCoordinator.handleGroupEmmigration. 如果是partition leader, GroupCoordinator.handleGroupImmigration -> GroupMetadataManager.loadGroupsForPartition 會重新從 __consumer_offset 讀取group數據到本地metadata cache, 如果是partition follower, GroupCoordniator.handleGroupImigration -> GroupMetadataManager.removeGroupsForPartition 會從metadata cache中移除group信息. 并在onGroupUnloaded回調函數中將group的狀態變更為dead. 同時通知所有等待join或者sync的組成員.

KAFKA在Broker關閉時不會自動做partition 副本的遷移. 這時被關閉的Broker上的副本變為under replicated 狀態. 這種狀態將持續直到Broker被重新拉起并且追上新的數據, 或者用戶通過命令行 手動復制副本到其他節點.

官方建議設置兩個參數來保證graceful shutdown. controlled.shutdown.enable=true auto.leader.rebalance.enable=true前者保證關機之前將日志數據同步到磁盤,并進行重選舉. 后者保證在broker重新恢復后再次獲得宕機前leader狀態. 避免leader分配不均勻導致讀寫熱點.

Reference

https://blog.csdn.net/zhanglh046/article/details/72833129
https://blog.csdn.net/huochen1994/article/details/80511038
https://www.jianshu.com/p/1aba6e226763

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

青海省| 新干县| 兰溪市| 涟源市| 七台河市| 元阳县| 张掖市| 文成县| 长乐市| 岚皋县| 涞源县| 萝北县| 洛南县| 买车| 新化县| 天等县| 龙游县| 荥经县| 云和县| 东平县| 鄢陵县| 民县| 淅川县| 灌阳县| 荣成市| 宣化县| 昌宁县| 邛崃市| 正阳县| 安义县| 大竹县| 临武县| 斗六市| 武鸣县| 惠州市| 唐山市| 富宁县| 建瓯市| 隆化县| 汾阳市| 江阴市|