亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Springboot如何集成Kafka進行批量消費

發布時間:2021-12-14 14:04:44 來源:億速云 閱讀:1278 作者:iii 欄目:開發技術

本篇內容主要講解“Springboot如何集成Kafka進行批量消費”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Springboot如何集成Kafka進行批量消費”吧!

引入依賴

<dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>1.3.11.RELEASE</version>
            </dependency>

因為我的項目的 springboot 版本是 1.5.22.RELEASE,所以引的是 1.3.11.RELEASE 的包。讀者可以根據下圖來自行選擇對應的版本。圖片更新可能不及時,詳情可查看spring-kafka 官方網站。

Springboot如何集成Kafka進行批量消費

注:這里有個踩坑點,如果引入包版本不對,項目啟動時會拋出org.springframework.core.log.LogAccessor 異常:

java.lang.ClassNotFoundException: org.springframework.core.log.LogAccessor

創建配置類

/**
     * kafka 配置類
     */
    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {

        private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class);

        @Value("${kafka.bootstrap.servers}")
        private String kafkaBootstrapServers;

        @Value("${kafka.group.id}")
        private String kafkaGroupId;

        @Value("${kafka.topic}")
        private String kafkaTopic;

        public static final String CONFIG_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka_client_jaas.conf";

        public static final String LOCATION_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka.client.truststore.jks";


        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            // 設置并發量,小于或者等于 Topic 的分區數
            factory.setConcurrency(5);
            // 設置為批量監聽
            factory.setBatchListener(Boolean.TRUE);
            factory.getContainerProperties().setPollTimeout(30000);
            return factory;
        }

        public ConsumerFactory<String, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }

        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            //設置接入點,請通過控制臺獲取對應Topic的接入點。
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
            //設置SSL根證書的路徑,請記得將XXX修改為自己的路徑。
            //與SASL路徑類似,該文件也不能被打包到jar中。
            System.setProperty("java.security.auth.login.config", CONFIG_PATH);
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, LOCATION_PATH);

            //根證書存儲的密碼,保持不變。
            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
            //接入協議,目前支持使用SASL_SSL協議接入。
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            //SASL鑒權方式,保持不變。
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            // 自動提交
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE);
            //兩次Poll之間的最大允許間隔。
            //消費者超過該值沒有返回心跳,服務端判斷消費者處于非存活狀態,服務端將消費者從Consumer Group移除并觸發Rebalance,默認30s。
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
            //設置單次拉取的量,走公網訪問時,該參數會有較大影響。
            props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
            props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
            //每次Poll的最大數量。
            //注意該值不要改得太大,如果Poll太多數據,而不能在下次Poll之前消費完,則會觸發一次負載均衡,產生卡頓。
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
            //消息的反序列化方式。
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            //當前消費實例所屬的消費組,請在控制臺申請之后填寫。
            //屬于同一個組的消費實例,會負載消費消息。
            props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
            //Hostname校驗改成空。
            props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
            return props;
        }
    }

注:此處通過 factory.setConcurrency(5); 配置了并發量為 5 ,假設我們線上的 Topic 有 12 個分區。那么將會是 3 個線程分配到 2 個分區,2 個線程分配到 3 個分區,3 * 2 + 2 * 3 = 12。

Kafka 消費者

/**
     * kafka 消息消費類
     */
    @Component
    public class KafkaMessageListener {

        private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);

        @KafkaListener(topics = {"${kafka.topic}"})
        public void listen(List<ConsumerRecord<String, String>> recordList) {
            for (ConsumerRecord<String,String> record : recordList) {
                // 打印消息的分區以及偏移量
                LOGGER.info("Kafka Consume partition:{}, offset:{}", record.partition(), record.offset());
                String value = record.value();
                System.out.println("value = " + value);
                // 處理業務邏輯 ...
            }
        }
    }

因為我在配置類中設置了批量監聽,所以此處 listen 方法的入參是List:List<ConsumerRecord<String, String>>。

到此,相信大家對“Springboot如何集成Kafka進行批量消費”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

河源市| 连山| 靖州| 叶城县| 施秉县| 桂阳县| 苏尼特左旗| 洪泽县| 措勤县| 铁岭县| 弥勒县| 太仆寺旗| 贞丰县| 闻喜县| 禹州市| 和田县| 衢州市| 井冈山市| 泸定县| 武胜县| 湾仔区| 化隆| 平果县| 师宗县| 凯里市| 吉首市| 涟源市| 新野县| 清河县| 古浪县| 紫金县| 镶黄旗| 巨鹿县| 梓潼县| 交口县| 堆龙德庆县| 阳西县| 正镶白旗| 赤壁市| 鄂尔多斯市| 徐闻县|