亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spring?kafka?@KafkaListener如何使用

發布時間:2023-02-25 13:49:45 來源:億速云 閱讀:163 作者:iii 欄目:開發技術

今天小編給大家分享一下spring kafka @KafkaListener如何使用的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。

說明

  • 從2.2.4版開始,您可以直接在注釋上指定Kafka使用者屬性,這些屬性將覆蓋在使用者工廠中配置的具有相同名稱的所有屬性。您不能通過這種方式指定group.id和client.id屬性。他們將被忽略;

  • 可以使用#{…}或屬性占位符(${…})在SpEL上配置注釋上的大多數屬性。

比如:

   @KafkaListener(id = "consumer-id",topics = "SHI_TOPIC1",concurrency = "${listen.concurrency:3}",
            clientIdPrefix = "myClientId")

屬性concurrency將會從容器中獲取listen.concurrency的值,如果不存在就默認用3

@KafkaListener詳解

id 監聽器的id

①. 消費者線程命名規則

填寫:

2020-11-19 14:24:15 c.d.b.k.KafkaListeners 120 [INFO] 線程:Thread[consumer-id5-1-C-1,5,main]-groupId:BASE-DEMO consumer-id5 消費

沒有填寫ID:

2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] 線程:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1,5,main] consumer-id7

②.在相同容器中的監聽器ID不能重復

否則會報錯

Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id

③.會覆蓋消費者工廠的消費組GroupId

假如配置文件屬性配置了消費組kafka.consumer.group-id=BASE-DEMO
正常情況它是該容器中的默認消費組
但是如果設置了 @KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"})
那么當前消費者的消費組就是consumer-id7 ;

當然如果你不想要他作為groupId的話 可以設置屬性idIsGroup = false;那么還是會使用默認的GroupId;

④. 如果配置了屬性groupId,則其優先級最高

 @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test")

例如上面代碼中最終這個消費者的消費組GroupId是 “groupId-test”

該id屬性(如果存在)將用作Kafka消費者group.id屬性,并覆蓋消費者工廠中的已配置屬性(如果存在)您還可以groupId顯式設置或將其設置idIsGroup為false,以恢復使用使用者工廠的先前行為group.id。

groupId 消費組名

指定該消費組的消費組名; 關于消費組名的配置可以看看上面的 id 監聽器的id

如何獲取消費者 group.id

在監聽器中調用KafkaUtils.getConsumerGroupId()可以獲得當前的groupId; 可以在日志中打印出來; 可以知道是哪個客戶端消費的;

topics 指定要監聽哪些topic(與topicPattern、topicPartitions 三選一)

可以同時監聽多個
topics = {"SHI_TOPIC3","SHI_TOPIC4"}

topicPattern 匹配Topic進行監聽(與topics、topicPartitions 三選一) topicPartitions 顯式分區分配

可以為監聽器配置明確的主題和分區(以及可選的初始偏移量)

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

上面例子意思是 監聽topic1的0,1分區;監聽topic2的第0分區,并且第1分區從offset為100的開始消費;

errorHandler 異常處理

實現KafkaListenerErrorHandler; 然后做一些異常處理;

@Component
public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        return null;
    }

    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
    	//do someting
        return null;
    }
}

調用的時候 填寫beanName;例如errorHandler="kafkaDefaultListenerErrorHandler"

containerFactory 監聽器工廠

指定生成監聽器的工廠類;

例如我寫一個 批量消費的工廠類

    /**
     * 監聽器工廠 批量消費
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        //設置為批量消費,每個批次數量在Kafka配置參數中設置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        return factory;
    }

使用containerFactory = "batchFactory"

clientIdPrefix 客戶端前綴

會覆蓋消費者工廠的kafka.consumer.client-id屬性; 最為前綴后面接 -n n是數字

concurrency并發數

會覆蓋消費者工廠中的concurrency ,這里的并發數就是多線程消費; 比如說單機情況下,你設置了3; 相當于就是啟動了3個客戶端來分配消費分區;分布式情況 總線程數=concurrency*機器數量; 并不是設置越多越好,具體如何設置請看Java concurrency之集合

    /**
     * 監聽器工廠 
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        factory.setConcurrency(6);
        return factory;
    }
    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)

雖然使用的工廠是concurrencyFactory(concurrency配置了6); 但是他最終生成的監聽器數量 是1;

properties 配置其他屬性

kafka中的屬性看org.apache.kafka.clients.consumer.ConsumerConfig ;
同名的都可以修改掉;

用法

    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1"
            , clientIdPrefix = "myClientId5",groupId = "groupId-test",
            properties = {
                    "enable.auto.commit:false","max.poll.interval.ms:6000" },errorHandler="kafkaDefaultListenerErrorHandler")

@KafkaListener使用

KafkaListenerEndpointRegistry

    @Autowired
    private KafkaListenerEndpointRegistry registry;
       //.... 獲取所有注冊的監聽器
        registry.getAllListenerContainers();

設置入參驗證器

當您將Spring Boot與驗證啟動器一起使用時,將LocalValidatorFactoryBean自動配置:如下

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;
    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator);
    }
}

使用

@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
      containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
}

spring-kafka官方文檔

擴展:Spring for Apache Kafka @KafkaListener使用及注意事項

官方文檔:   https://docs.spring.io/spring-kafka/reference/html/

 @KafkaListener

The @KafkaListener annotation is used to designate a bean method as a listener for a listener container. The bean is wrapped in a MessagingMessageListenerAdapter configured with various features, such as converters to convert the data, if necessary, to match the method parameters.

If, say, six TopicPartition instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.

You can now configure a KafkaListenerErrorHandler to handle exceptions. See Handling Exceptions for more information.

By default, the @KafkaListener id property is now used as the group.id property, overriding the property configured in the consumer factory (if present). Further, you can explicitly configure the groupId on the annotation. Previously, you would have needed a separate container factory (and consumer factory) to use different group.id values for listeners. To restore the previous behavior of using the factory configured group.id, set the idIsGroup property on the annotation to false.

示例:

   demo類:

public class Listener {

    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }

}</code>

配置類及注解:
@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

以上就是“spring kafka @KafkaListener如何使用”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

溆浦县| 桑植县| 威信县| 都江堰市| 贡觉县| 阿克陶县| 乌拉特后旗| 万全县| 育儿| 哈巴河县| 陆良县| 南平市| 富锦市| 罗山县| 汝州市| 汉阴县| 获嘉县| 库尔勒市| 鄂伦春自治旗| 花垣县| 陆川县| 浪卡子县| 浑源县| 湘乡市| 淮安市| 普兰县| 平顺县| 高淳县| 澳门| 西华县| 凉山| 莲花县| 正镶白旗| 辽阳县| 福海县| 依兰县| 乌兰察布市| 张家界市| 海安县| 昌江| 宁陕县|