亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行kafka批量消費多消費者問題分析

發布時間:2021-12-15 09:29:26 來源:億速云 閱讀:741 作者:柒染 欄目:大數據

今天就跟大家聊聊有關如何進行kafka批量消費多消費者問題分析,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

package com.llw.medical.bs.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.stereotype.Component;import java.util.List;import java.util.Optional;@Componentpublic class KafakaListener {@KafkaListener(id = "1", topics = {"topic2"})public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("----------------- record =" + record);
            System.out.println("----------------- message =" + message);
        }
    }@KafkaListener(id = "2", topicPartitions =
            {@TopicPartition(topic = "topic1",
                    partitions = {"1", "2", "3"}// partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )
            })public void listen2(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("----------------- record 1=" + record);
            System.out.println("------------------ message 1=" + message);
        }
    }//id = "4",    //id="4"    @KafkaListener( id= "4",groupId = "1",topics="topic1", /*topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"0"}                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )*//*            },*/ containerFactory = "kafkaBatchListener6")public void listen3(List<ConsumerRecord<?, ?>> records) {//, Acknowledgment ack        try {for (ConsumerRecord<?, ?> record : records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
                    Object message = kafkaMessage.get();
                    System.out.println("----------------- record 4=" + record);//   System.out.println("------------------ message 4=" + message);                }
            }
        } finally {//   ack.acknowledge();        }
    }//id="5"    @KafkaListener(id = "5",groupId = "1",topics="topic1", /*topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"0"}                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )            },*/ containerFactory = "kafkaBatchListener6")public void listen2(List<ConsumerRecord<?, ?>> records) {//, Acknowledgment ack        try {for (ConsumerRecord<?, ?> record : records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
                    Object message = kafkaMessage.get();
                    System.out.println("----------------- record 6=" + record);//   System.out.println("------------------ message 6=" + message);                }
            }
        } finally {//   ack.acknowledge();        }
    }//https://www.cnblogs.com/linjiqin/p/13171789.html    @KafkaListener(id = "6",groupId = "1",topics="topic1",/* topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"0"}                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )            }, */containerFactory = "kafkaBatchListener6")public void listen4(List<ConsumerRecord<?, ?>> records) {try {for (ConsumerRecord<?, ?> record : records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
                    Object message = kafkaMessage.get();
                    System.out.println("----------------- record 3=" + record);//   System.out.println("------------------ message 6=" + message);                }
            }
        } finally {//   ack.acknowledge();        }
    }

}

一個partition只能有一個消費者,如果多個消費者會是廣播模式,每個消費者都會有一條數據,kafka是一個發布和訂閱模式的主鍵,并不是隊列模式,

spring boot整合時,如果使用topicPartitions 注解參數指定partition會有消息重復消費的問題,最好使用topics注解,并指定groupId。

看完上述內容,你們對如何進行kafka批量消費多消費者問題分析有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

融水| 潜江市| 明水县| 廊坊市| 肇州县| 宁陵县| 饶河县| 抚宁县| 沙坪坝区| 烟台市| 昌平区| 肇庆市| 盖州市| 浦县| 云南省| 开原市| 广饶县| 昂仁县| 朔州市| 霞浦县| 通许县| 天门市| 惠东县| 紫金县| 巴南区| 临夏县| 绥阳县| 广安市| 台前县| 阳谷县| 荣成市| 右玉县| 临夏市| 那坡县| 定襄县| 永川市| 安图县| 博白县| 镶黄旗| 黄梅县| 新竹市|