亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

kafka consumer怎么使用

發布時間:2021-12-16 16:48:14 來源:億速云 閱讀:246 作者:iii 欄目:云計算

這篇文章主要介紹“kafka consumer怎么使用”,在日常操作中,相信很多人在kafka consumer怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”kafka consumer怎么使用”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

consumer作為kafka當中一個重要元素,它的常用操作并不復雜,說白了無非就是2點,1、把數據poll出來,2、把位置標記上。我們找到kafka的java api doc,找到了官方提供的幾種consumer操作的例子,逐一進行分析,看看都有幾種操作類型。

Automatic Offset Committing

自動 Offset 提交

這個例子顯示了一個基于offset自動提交的consumer api的簡單應用。

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
     }

enable.auto.commit 意味著offset將會得到自動提交,而這個自動提交的時間間隔由 auto.commit.interval.ms 來進行控制。

客戶端通過 bootstrap.servers 的配置來連接服務器,這個配值當中可以是一個或多個broker,需要注意的是,這個配置僅僅用來讓客戶端找到我們的server集群,而不需要把集群當中的所有服務器地址都列上。

在這個例子當中,客戶端作為test group的一員,訂閱了foo和bar2個topic。

( 這一段直接翻譯很蹩腳,我會試著根據自己的理解翻譯出來)首先假設,foo和bar這2個topic,都分別有3個partitions,同時我們將上面的代碼在我們的機器上起3個進程,也就是說,在test group當中,目前有了3個consumer,一般來講,這3個consumer會分別獲得 foo和bar 的各一個partitions,這是前提。3個consumer會周期性的執行一個poll的動作(這個動作當中隱含的有一個heartbeat的發送,來告訴cluster我是活的),這樣3個consumer會持續的保有他們對分配給自己的partition的訪問的權利,如果某一個consumer失效了,也就是poll不再執行了,cluster會在一段時間( session.timeout.ms )之后把partitions分配給其他的consumer。

反序列化的設置,定義了如何轉化bytes,這里我們把key和value都直接轉化為string。

Manual Offset Control

手動的offset控制

除了周期性的自動提交offset之外,用戶也可以在消息被消費了之后提交他們的offset。

某些情況下,消息的消費是和某些處理邏輯相關聯的,我們可以用這樣的方式,手動的在處理邏輯結束之后提交offset。

簡要地說,在這個例子當中,我們希望每次至少消費200條消息并將它們插入數據庫,之后再提交offset。如果仍然使用前面的自動提交方式,就可能出現消息已經被消費,但是插入數據庫失敗的情況。這里可以視作一個簡單的事務封裝。

但是,有沒有另一種可能性,在插入數據庫成功之后,提交offset之前,發生了錯誤,或者說是提交offset本身發生了錯誤,那么就可能出現某些消息被重復消費的情況。

個人認為這段話說的莫名其妙,簡單地說,采用這樣的方式,消息不會被丟失,但是有可能出現重復消費。

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "false");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer);
             consumer.commitSync();
             buffer.clear();
         }
     }

上面的例子當中,我們用commitSync來標記所有的消息;在有些情況下,我們可能希望更加精確的控制offset,那么在下面的例子當中,我們可以在每一個partition當中分別控制offset的提交。

try {
         while(running) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }

注意:提交的offset應該是next message,所以,提交的時候需要在當前最后一條的基礎上+1.

Manual Partition Assignment

手動的分區分配

前面的例子當中,我們訂閱一個topic,然后讓kafka把該topic當中的不同partitions,公平的在一個consumer group內部進行分配。那么,在某些情況下,我們希望能夠具體的指定partitions的分配關系。

  • 如果某個進程在本地管理了和partition相關的狀態,那么它只需要獲得跟他相關partition。


  • 如果某個進程自身具備高可用性,那么就不需要kafka來檢測錯誤并重新分配partition,因為消費者進程會在另一臺設備上重新啟動。

要使用這種模式,可以用assign方法來代替subscribe,具體指定一個partitions列表。

String topic = "foo";
     TopicPartition partition0 = new TopicPartition(topic, 0);
     TopicPartition partition1 = new TopicPartition(topic, 1);
     consumer.assign(Arrays.asList(partition0, partition1));

分配之后,就可以像前面的例子一樣,在循環當中調用poll來消費消息。手動的分區分配不需要組協調,所以消費進程失效之后,不會引發partition的重新分配,每一個消費者都是獨立工作的,即使它和其他消費者屬于同一個group。為了避免offset提交的沖突,在這種情況下,通常我們需要保證每一個consumer使用自己的group id。

需要注意的是,手動partition分配和通過subscribe實現的動態的分區分配,2種方式是不能混合使用的。

到此,關于“kafka consumer怎么使用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

米易县| 睢宁县| 额济纳旗| 南丰县| 渝北区| 团风县| 扶沟县| 乌拉特后旗| 定西市| 白山市| 安塞县| 彰武县| 陆丰市| 丰县| 南宁市| 乌拉特中旗| 莱芜市| 布尔津县| 巨野县| 改则县| 皋兰县| 上栗县| 茌平县| 铁力市| 万载县| 确山县| 金坛市| 台南市| 黄平县| 高唐县| 黄浦区| 佛坪县| 婺源县| 休宁县| 文安县| 沙坪坝区| 杭州市| 镇安县| 华容县| 通河县| 施甸县|