亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Kafka單線程Consumer及參數詳解

發布時間:2020-06-14 19:08:20 來源:網絡 閱讀:209 作者:實時計算 欄目:大數據

請使用0.9以后的版本:

示例代碼
 Properties props = new Properties();
        props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("auto.offset.reset","earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
      try{  
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
         }
        }finally{
          consumer.close();
        }

1、只需要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必須指定);

2、用這些Properties構建consumer對象(KafkaConsumer還有其他構造,可以把序列化傳進去);

3、subscribe訂閱topic列表(可以用正則訂閱Pattern.compile("kafka.*")

使用正則必須指定一個listener subscribe(Pattern pattern, ConsumerRebalanceListener listener)); 可以重寫這個接口來實現 分區變更時的邏輯。如果設置了enable.auto.commit = true 就不用理會這個邏輯。

4、然后循環poll消息(這里的1000是超時設定,如果沒有很多數據,也就等一秒);

5、處理消息(打印了offset key value 這里寫處理邏輯)。

6、關閉KafkaConsumer(可以傳一個timeout值 等待秒數 默認是30)。

參數詳解

bootstrap.server(最好用主機名不用ip kafka內部用的主機名 除非自己配置了ip)

deserializer 反序列化consumer從broker端獲取的是字節數組,還原回對象類型。

默認有十幾種:StringDeserializer LongDeserializer DoubleDeserializer。。

也可以自定義:定義serializer格式 創建自定義deserializer類實現Deserializer 接口 重寫邏輯

?

除了四個必傳的 bootstrap.server group.id key.deserializer value.deserializer

還有session.timeout.ms "coordinator檢測失敗的時間"

是檢測consumer掛掉的時間 為了可以及時的rebalance 默認是10秒 可以設置更小的值避免消息延遲。

max.poll.interval.ms "consumer處理邏輯最大時間"

處理邏輯比較復雜的時候 可以設置這個值 避免造成不必要的 rebalance ,因為兩次poll時間超過了這個參數,kafka認為這個consumer已經跟不上了,會踢出組,而且不能提交offset,就會重復消費。默認是5分鐘。

auto.offset.reset "無位移或者位移越界時kafka的應對策略"

所以如果啟動了一個group從頭消費 成功提交位移后 重啟后還是接著消費 這個參數無效

所以3個值的解釋是:

earliset 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從最早的位移消費

latest 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 none topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常

(注意kafka-0.10.1.X版本之前:?auto.offset.reset 的值為smallest,和,largest.(offest保存在zk中) 、

我們這是說的是新版本:kafka-0.10.1.X版本之后:?auto.offset.reset 的值更改為:earliest,latest,和none (offest保存在kafka的一個特殊的topic名為:__consumer_offsets里面))

enable.auto.commit 是否自動提交位移

true 自動提交 false需要用戶手動提交 有只處理一次需要的 最近設置為false自己控制。

fetch.max.bytes consumer單次獲取最大字節數

max.poll.records 單次poll返回的最大消息數

默認500條 如果消費很輕量 可以適當提高這個值 增加消費速度。

hearbeat.interval.ms consumer其他組員感知rabalance的時間

該值必須小于 session.timeout.ms 如果檢測到 consumer掛掉 也就根本無法感知rabalance了

connections.max.idle.ms 定期關閉連接的時間

默認是9分鐘 可以設置為-1 永不關閉

更多實時計算,Kafka等相關技術博文,歡迎關注實時流式計算

Kafka單線程Consumer及參數詳解

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

慈利县| 宁城县| 兴仁县| 新疆| 乾安县| 兰西县| 潢川县| 张家港市| 通榆县| 辽宁省| 民丰县| 永定县| 海门市| 夏津县| 桂东县| 阳高县| 上思县| 晋州市| 当阳市| 长沙县| 个旧市| 琼海市| 宜城市| 盐亭县| 喜德县| 丽江市| 阿尔山市| 民权县| 丹江口市| 岳池县| 慈溪市| 鄂伦春自治旗| 石门县| 连江县| 囊谦县| 新化县| 长泰县| 松原市| 涿鹿县| 弥渡县| 阿巴嘎旗|