亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Kafka 0.10 KafkaConsumer流程簡述

發布時間:2020-06-12 22:07:15 來源:網絡 閱讀:619 作者:張濤澤 欄目:網絡安全

ConsumerConfig.scala 儲存Consumer的配置

按照我的理解,0.10的Kafka沒有專門的SimpleConsumer,仍然是沿用0.8版本的。

1.從poll開始

消費的規則如下:

  • 一個partition只能被同一個ConsumersGroup的一個線程所消費.

  • 線程數小于partition數,某些線程會消費多個partition.

  • 線程數等于partition數,一個線程正好消費一個線程.

  • 當添加消費者線程時,會觸發rebalance,partition的分配發送變化.

  • 同一個partition的offset保證消費有序,不同的partition消費不保證順序.

Consumers編程的用法:

private final KafkaConsumer<Long, String> consumer; // 與Kafka進行通信的consumer...
consumer = new KafkaConsumer<Long, String>(props);
consumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<Long, String> records = consumer.poll(512);
...

consumer,是一個純粹的單線程程序,后面所講的所有機制(包括coordinator,rebalance, heartbeat等),都是在這個單線程的poll函數里面完成的。也因此,在consumer的代碼內部,沒有鎖的出現。

1.1包括的組件

從KafkaConsumer的構造函數可以看出,KafkaConsumer有以下幾個核心部件:

  • Metadata: 存儲Topic/Partion與broker的映射關系

  • NetworkClient:網絡層 A network client for asynchronous request/response network i/o.

  • ConsumerNetworkClient: Higher level consumer access to the network layer //對NetworkClient的封裝,非線程安全

  • ConsumerCoordinator:只是client端的類,只是和服務端的GroupCoordinator通信的介質。(broker端的Coordinator 負責reblance、Offset提交、心跳)

  • SubscriptionState: consumer的Topic、Partition的offset狀態維護

  • Fetcher: manage the fetching process with the brokers. //獲取消息

后面會分組件講解Consumers的工作流程

1.2 Consumer消費者的工作過程:

  1. 在consumer啟動時或者coordinator節點故障轉移時,consumer發送ConsumerMetadataRequest給任意一個brokers。在ConsumerMetadataResponse中,它接收對應的Consumer Group所屬的Coordinator的位置信息。

  2. Consumer連接Coordinator節點,并發送HeartbeatRequest。如果返回的HeartbeatResponse中返回IllegalGeneration錯誤碼,說明協調節點已經在初始化平衡。消費者就會停止抓取數據,提交offsets,發送JoinGroupRequest給協調節點。在JoinGroupResponse,它接收消費者應該擁有的topic-partitions列表以及當前Consumer Group的新的generation編號。這個時候Consumer Group管理已經完成,Consumer就可以開始fetch數據,并為它擁有的partitions提交offsets。

  3. 如果HeartbeatResponse沒有錯誤返回,Consumer會從它上次擁有的partitions列表繼續抓取數據,這個過程是不會被中斷的。


2 設計

2.0 MetaData

見Producer里面的分析。

補充一下,KafkaConsumer、KafkaProducer都是在構造函數中獲取metadata信息,通過調用metadata.update方法來獲取信息。

2.1 coordinator 為什么,做什么

1.去zookeeper依賴 -- 為什么
  • 在0.9以前的client api中,consumer是要依賴Zookeeper的。因為同一個consumer group中的所有consumer需要進行協同,新航道雅思培訓這與后面要講的rebalance有關。(ConsumerConnector、KafkaStream、ConsumerIterator) -- package kafka.consumer

  • 0.9之后新的consumer不依賴與Zookeeper,一個consumerGroup內的consumer由Coordinator管理.(KafkaConsumer) -- package org.apache.kafka.clients.consumer

為什么?后面講

提問:為什么在一個group內部,1個parition只能被1個consumer擁有?

2.coordinator協議/partition分配問題

給定一個topic,有4個partition: p0, p1, p2, p3, 一個group有3個consumer: c0, c1, c2。

  • 那么,如果按RangeAssignor策略,分配結果是:
    c0: p0, c1: p1, c2: p2, p3

  • 如果按RoundRobinAssignor策略:
    c0: p1, p3, c1: p1, c2: p2

  • partition.assignment.strategy=RangeAssignor,默認值

(到底是哪種分配狀態呢)
那這整個分配過程是如何進行的呢?見下圖所示:
Kafka 0.10 KafkaConsumer流程簡述

3步分配過程

1. 步驟1:對于每1個consumer group,Kafka集群為其從broker集群中選擇一個broker作為其coordinator。因此,第1步就是找到這個coordinator。(1個consumer group對應一個coordinattor)

GroupCoordinatorRequest: GCR,由ConsumerNetworkClient發送請求去尋找coordinator。

2. 步驟2:找到coordinator之后,發送JoinGroup請求
consumer在這里會被劃分leader、follower(無責任的說:選擇第一個consumer)

  • leader作用:perform the leader synchronization and send back the assignment for the group(負責發送partition分配的結果)

  • follower作用:send follower's sync group with an empty assignment

3. 步驟3:JoinGroup返回之后,發送SyncGroup,得到自己所分配到的partition
SyncGroupRequest

  • consumer leader發送 SyncGroupRequest給Coordinator,Coordinator回給它null

  • follower發送 null的 SyncGroupRequest 給Coordinator,Coordinator回給它partition分配的結果。

注意,在上面3步中,有一個關鍵點:

  • partition的分配策略和分配結果其實是由client決定的,而不是由coordinator決定的。什么意思呢?在第2步,所有consumer都往coordinator發送JoinGroup消息之后,coordinator會指定其中一個consumer作為leader,其他consumer作為follower。

  • 然后由這個leader進行partition分配。

  • 然后在第3步,leader通過SyncGroup消息,把分配結果發給coordinator,其他consumer也發送SyncGroup消息,獲得這個分配結果。

接下來就到Fetcher拉取數據了

2.2 Fetcher

四個步驟

  1. 步驟0:獲取consumer的offset

  2. 步驟1:生成FetchRequest,并放入發送隊列

  3. 步驟2:網絡poll

  4. 步驟3:獲取結果

1.獲取consumer的offset

當consumer初次啟動的時候,面臨的一個首要問題就是:從offset為多少的位置開始消費。

poll之前,給集群發送請求,讓集群告知客戶端,當前該TopicPartition的offset是多少。通過SubscriptionState來實現, 通過ConsumerCoordinator

if (!subscriptions.hasAllFetchPositions())            updateFetchPositions(this.subscriptions.missingFetchPositions());

核心是:向Coordinator發了一個OffsetFetchRequest,并且是同步調用,直到獲取到初始的offset,再開始接下來的poll.(也就是說Offset的信息如果存在Kafka里,是存在GroupCoordinator里面)

consumer的每個TopicPartition都有了初始的offset,接下來就可以進行不斷循環取消息了,這也就是Fetch的過程:

2.生成FetchRequest,并放入發送隊列 -- fetcher.initFetches(cluster)

核心就是生成FetchRequest: 假設一個consumer訂閱了3個topic: t0, t1, t2,為其分配的partition分別是: t0: p0; t1: p1, p2; t2: p2

即總共4個TopicPartition,即t0p0, t0p1, t1p1, t2p2。這4個TopicPartition可能分布在2臺機器n0, n1上面: n0: t0p0, t1p1 n1: t0p1, t2p2

則會分別針對每臺機器生成一個FetchRequest,即Map<Node, FetchRequest>。所以會有一個方法把所有屬于同一個Node的TopicPartition放在一起,生成一個FetchRequest。

3.網絡poll

調用ConsumerNetworkClient.poll發送網絡請求。向服務器發 送響應請求和獲取服務器的響應。(默認值:executeDelayedTasks=true)

4.獲取結果 -- fetcher.fetchedRecords()

獲取Broker返回的Response,里面包含了List<ConsumerRecord> records

2.3 offset確認機制

  • 是否自動消費確認:由參數auto.xxx.commit=true控制

  • 手動消費:用于自定義Consumers的消費控制

下面從自動消費確認來分析,Offset自動確認是由ConsumerCoordinatorAutoCommitTask來實現的。

其調用在ConsumerNetworkClient的 DelayedTaskQueue delayedTasks里面,然后被周期性的調用。 周期性的發送確認消息,類似HeartBeat,其實現機制也就是前面所講的DelayedQueue + DelayedTask.

確認一次:offset的提交

poll函數中的注釋:
// execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records

  • 可以這樣理解:第二次poll調用的時候,提交上一次poll的offset和心跳發送

  • 先提交offset,再去拉取record。那么這次Offset其實是上一次poll的Record的offset。

  • 因此,當你把按照下面的邏輯寫程序的時候,可能會導致Consumer與Coordinator的心跳超時。

    while(true) {
    consumer.poll();do process message // 假如這個耗時過長,那么這個consumer就無法發送心跳給coordinator,導致它錯誤認為這個consumer失去聯系了,引起不必要的rebalance。槽糕的情況下,會丟重復消費數據。}

    因此,有必要把offset的提交單獨拿出來做一個線程。

到這里,就把整個Consumer的流程走完了。


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

黔西县| 城口县| 鄢陵县| 冀州市| 武功县| 肃北| 鄯善县| 西青区| 特克斯县| 韩城市| 黎川县| 屯留县| 连平县| 昌平区| 闽侯县| 湟源县| 双城市| 安福县| 工布江达县| 垦利县| 普宁市| 澄城县| 乡城县| 娄烦县| 三门峡市| 石阡县| 罗平县| 太白县| 红原县| 理塘县| 安远县| 新巴尔虎右旗| 慈利县| 治县。| 肇州县| 玉屏| 三台县| 广德县| 伊吾县| 肇东市| 林周县|