亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spring?kafka框架中@KafkaListener注解怎么使用

發布時間:2023-02-25 11:42:55 來源:億速云 閱讀:166 作者:iii 欄目:開發技術

這篇文章主要介紹“spring kafka框架中@KafkaListener注解怎么使用”,在日常操作中,相信很多人在spring kafka框架中@KafkaListener注解怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”spring kafka框架中@KafkaListener注解怎么使用”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

    簡介

    Kafka 目前主要作為一個分布式的發布訂閱式的消息系統使用,也是目前最流行的消息隊列系統之一。因此,也越來越多的框架對 kafka 做了集成,比如本文將要說到的 spring-kafka。

    Kafka 既然作為一個消息發布訂閱系統,就包括消息生成者和消息消費者。本文主要講述的 spring-kafka 框架的 kafkaListener 注解的深入解讀和使用案例。

    解讀

    源碼解讀

    @Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
    
    @Retention(RetentionPolicy.RUNTIME)
    
    @MessageMapping
    
    @Documented
    
    @Repeatable(KafkaListeners.class)
    
    public @interface KafkaListener {
       /**
    
        * 消費者的id,當GroupId沒有被配置的時候,默認id為GroupId
    
        */
    
       String id() default "";
       /**
    
        * 監聽容器工廠,當監聽時需要區分單數據還是多數據消費需要配置containerFactory      屬性
    
        */
    
       String containerFactory() default "";
       /**
    
        * 需要監聽的Topic,可監聽多個,和 topicPattern 屬性互斥
    */
    
       String[] topics() default {};
       /**
    
        * 需要監聽的Topic的正則表達。和 topics,topicPartitions屬性互斥
        */
    
       String topicPattern() default "";
       /**
    
        * 可配置更加詳細的監聽信息,必須監聽某個Topic中的指定分區,或者從offset為200的偏移量開始監聽,可配置該參數, 和 topicPattern 屬性互斥
        */
    
       TopicPartition[] topicPartitions() default {};
       /**
    
        *偵聽器容器組 
    
        */
    
       String containerGroup() default "";
       /**
    
        * 監聽異常處理器,配置BeanName
    
        */
    
       String errorHandler() default "";
       /**
    
        * 消費組ID 
    
        */
    
       String groupId() default "";
       /**
    
        * id是否為GroupId
    
        */
    
       boolean idIsGroup() default true;
       /**
    
        * 消費者Id前綴
    
        */
    
       String clientIdPrefix() default "";
       /**
    
        * 真實監聽容器的BeanName,需要在 BeanName前加 "__"
    
        */
    
       String beanRef() default "__listener";
    }

    使用案例

    ConsumerRecord 類消費

    使用 ConsumerRecord 類接收有一定的好處,ConsumerRecord 類里面包含分區信息、消息頭、消息體等內容,如果業務需要獲取這些參數時,使用 ConsumerRecord 會是個不錯的選擇。如果使用具體的類型接收消息體則更加方便,比如說用 String 類型去接收消息體。

    這里我們編寫一個 Listener 方法,監聽 "topic1"Topic,并把 ConsumerRecord 里面所包含的內容打印到控制臺中:

    @Component
    
    public class Listener {
        private static final Logger log = LoggerFactory.getLogger(Listener.class);
        @KafkaListener(id = "consumer", topics = "topic1")
    
        public void consumerListener(ConsumerRecord record) {
    
            log.info("topic.quick.consumer receive : " + record.toString());
    
        }
    }

    批量消費

    批量消費在現實業務場景中是很有實用性的。因為批量消費可以增大 kafka 消費吞吐量, 提高性能。

    批量消費實現步驟:

    1、重新創建一份新的消費者配置,配置為一次拉取 10 條消息

    2、創建一個監聽容器工廠,命名為:batchContainerFactory,設置其為批量消費并設置并發量為 5,這個并發量根據分區數決定,必須小于等于分區數,否則會有線程一直處于空閑狀態。

    3、創建一個分區數為 8 的 Topic。

    4、創建監聽方法,設置消費 id 為 “batchConsumer”,clientID 前綴為“batch”,監聽“batch”,使用“batchContainerFactory” 工廠創建該監聽容器。

    @Component
    
    public class BatchListener {
        private static final Logger log= LoggerFactory.getLogger(BatchListener.class);
        private Map consumerProps() {
    
            Map props = new HashMap<>();
    
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    
            //一次拉取消息數量
    
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
    
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    
                    NumberDeserializers.IntegerDeserializer.class);
    
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    
                    StringDeserializer.class);
    
            return props;
    
        }
        @Bean("batchContainerFactory")
    
        public ConcurrentKafkaListenerContainerFactory listenerContainer() {
    
            ConcurrentKafkaListenerContainerFactory container
    
                    = new ConcurrentKafkaListenerContainerFactory();
    
            container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
    
            //設置并發量,小于或等于Topic的分區數
    
            container.setConcurrency(5);
    
            //必須 設置為批量監聽
    
            container.setBatchListener(true);
    
            return container;
    
        }
        @Bean
    
        public NewTopic batchTopic() {
    
            return new NewTopic("topic.batch", 8, (short) 1);
    
        }
        @KafkaListener(id = "batchConsumer",clientIdPrefix = "batch"
    
                ,topics = {"topic.batch"},containerFactory = "batchContainerFactory")
    
        public void batchListener(List data) {
    
            log.info("topic.batch  receive : ");
    
            for (String s : data) {
    
                log.info(  s);
    
            }
    
        }
    
    }

    監聽 Topic 中指定的分區

    使用 @KafkaListener 注解的 topicPartitions 屬性監聽不同的 partition 分區。

    @TopicPartition:topic-- 需要監聽的 Topic 的名稱,partitions &ndash; 需要監聽 Topic 的分區 id。

    partitionOffsets &ndash; 可以設置從某個偏移量開始監聽,@PartitionOffset:partition &ndash; 分區 Id,非數組,initialOffset &ndash; 初始偏移量。

    @Bean
    
    public NewTopic batchWithPartitionTopic() {
    
        return new NewTopic("topic.batch.partition", 8, (short) 1);
    
    }
    @KafkaListener(id = "batchWithPartition",clientIdPrefix = "bwp",containerFactory = "batchContainerFactory",
    
            topicPartitions = {
    
                    @TopicPartition(topic = "topic.batch.partition",partitions = {"1","3"}),
    
                    @TopicPartition(topic = "topic.batch.partition",partitions = {"0","4"},
    
                            partitionOffsets = @PartitionOffset(partition = "2",initialOffset = "100"))
    
            }
    
    )
    
    public void batchListenerWithPartition(List data) {
    
        log.info("topic.batch.partition  receive : ");
    
        for (String s : data) {
    
            log.info(s);
    
        }
    
    }

    注解方式獲取消息頭及消息體

    當你接收的消息包含請求頭,以及你監聽方法需要獲取該消息非常多的字段時可以通過這種方式。。這里使用的是默認的監聽容器工廠創建的,如果你想使用批量消費,把對應的類型改為 List 即可,比如 List data , List key。

    @Payload:獲取的是消息的消息體,也就是發送內容

    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):獲取發送消息的 key

    @Header(KafkaHeaders.RECEIVED_PARTITION_ID):獲取當前消息是從哪個分區中監聽到的

    @Header(KafkaHeaders.RECEIVED_TOPIC):獲取監聽的 TopicName

    @Header(KafkaHeaders.RECEIVED_TIMESTAMP):獲取時間戳

    @KafkaListener(id = "params", topics = "topic.params")
    
    public void otherListener(@Payload String data,
    
                             @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
    
                             @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
    
                             @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
    
                             @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
    
        log.info("topic.params receive : \n"+
    
                "data : "+data+"\n"+
    
                "key : "+key+"\n"+
    
                "partitionId : "+partition+"\n"+
    
                "topic : "+topic+"\n"+
    
                "timestamp : "+ts+"\n"
    
        );
    
    }

    使用 Ack 機制確認消費

    Kafka 是通過最新保存偏移量進行消息消費的,而且確認消費的消息并不會立刻刪除,所以我們可以重復的消費未被刪除的數據,當第一條消息未被確認,而第二條消息被確認的時候,Kafka 會保存第二條消息的偏移量,也就是說第一條消息再也不會被監聽器所獲取,除非是根據第一條消息的偏移量手動獲取。Kafka 的 ack 機制可以有效的確保消費不被丟失。因為自動提交是在 kafka 拉取到數據之后就直接提交,這樣很容易丟失數據,尤其是在需要事物控制的時候。

    使用 Kafka 的 Ack 機制比較簡單,只需簡單的三步即可:

    • 設置 ENABLE_AUTO_COMMIT_CONFIG=false,禁止自動提交

    • 設置 AckMode=MANUAL_IMMEDIATE

    • 監聽方法加入 Acknowledgment ack 參數

    4.使用 Consumer.seek 方法,可以指定到某個偏移量的位置

    @Component
    
    public class AckListener {
    
        private static final Logger log = LoggerFactory.getLogger(AckListener.class);
        private Map consumerProps() {
    
            Map props = new HashMap<>();
    
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
            return props;
    
        }
        @Bean("ackContainerFactory")
    
        public ConcurrentKafkaListenerContainerFactory ackContainerFactory() {
    
            ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
    
            factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
    
            return factory;
    
        }
        @KafkaListener(id = "ack", topics = "topic.ack", containerFactory = "ackContainerFactory")
    
        public void ackListener(ConsumerRecord record, Acknowledgment ack) {
    
            log.info("topic.quick.ack receive : " + record.value());
    
            ack.acknowledge();
    
        }
    
    }

    解決重復消費

    上一節中使用 ack 手動提交偏移量時,假如 consumer 掛了重啟,那它將從 committed offset 位置開始重新消費,而不是 consume offset 位置。這也就意味著有可能重復消費。

    在 0.9 客戶端中,有 3 種 ack 策略:

    策略 1: 自動的,周期性的 ack。

    策略 2:consumer.commitSync(),調用 commitSync,手動同步 ack。每處理完 1 條消息,commitSync 1 次。

    策略 3:consumer. commitASync(),手動異步 ack。、

    那么使用策略 2,提交每處理完 1 條消息,就發送一次 commitSync。那這樣是不是就可以解決 “重復消費” 了呢?如下代碼:

    while (true) {
    
            List buffer = new ArrayList<>();
    
            ConsumerRecords records = consumer.poll(100);
    
            for (ConsumerRecord record : records) {
    
                buffer.add(record);
    
            }
    
            insertIntoDb(buffer);    //消除處理,存到db
    
            consumer.commitSync();   //同步發送ack
    
            buffer.clear();
    
        }
    
    }

    答案是否定的!因為上面的 insertIntoDb 和 commitSync 做不到原子操作:如果在數據處理完成,commitSync 的時候掛了,服務器再次重啟,消息仍然會重復消費。

         那么如何解決重復消費的問題呢?答案是自己保存 committed offset,而不是依賴 kafka 的集群保存 committed offset,把消息的處理和保存 offset 做成一個原子操作,并且對消息加入唯一 id, 進行判重。

    依照官方文檔, 要自己保存偏移量, 需要:

    • enable.auto.commit=false, 禁用自動 ack。

    • 每次取到消息,把對應的 offset 存下來。

    • 下次重啟,通過 consumer.seek 函數,定位到自己保存的 offset,從那開始消費。

    • 更進一步處理可以對消息加入唯一 id, 進行判重。

    到此,關于“spring kafka框架中@KafkaListener注解怎么使用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    沂水县| 中卫市| 屏南县| 永昌县| 砀山县| 灌云县| 伊宁市| 大荔县| 伊川县| 温宿县| 新野县| 渭源县| 宝丰县| 蓬溪县| 赤城县| 铜梁县| 科技| 蛟河市| 广东省| 保亭| 漠河县| 常德市| 龙胜| 阆中市| 江安县| 罗江县| 金溪县| 定远县| 原阳县| 融水| 淮阳县| 太湖县| 洱源县| 封开县| 镶黄旗| 阜新市| 东莞市| 广西| 枞阳县| 布拖县| 浙江省|