您好,登錄后才能下訂單哦!
這篇文章主要介紹了springboot項目配置多個kafka的示例代碼的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇springboot項目配置多個kafka的示例代碼文章都會有所收獲,下面我們一起來看看吧。
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.5.RELEASE</version> </dependency>
kafka.bootstrap-servers=localhost:9092 kafka.consumer.group.id=20230321 #可以并發消費的線程數 (通常與partition數量一致) kafka.consumer.concurrency=10 kafka.consumer.enable.auto.commit=false kafka.bootstrap-servers.pic=localhost:29092 kafka.consumer.group.id.pic=20230322_pic kafka.consumer.concurrency.pic=10 kafka.consumer.enable.auto.commit.pic=false
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.concurrency}") private int concurrency; @Value("${kafka.consumer.enable.auto.commit}") private String autoCommit; @Value("${kafka.bootstrap-servers}") private String bootstrapServer; @Value("${kafka.consumer.group.id.pic}") private String groupIdPic; @Value("${kafka.consumer.concurrency.pic}") private int concurrencyPic; @Value("${kafka.consumer.enable.auto.commit.pic}") private String autoCommitPic; @Value("${kafka.bootstrap-servers.pic}") private String bootstrapServerPic; @Bean public ConsumerFactory<String, String> consumerFactory() { String bootstrapServers = bootstrapServer; Map<String, Object> configProps = new HashMap<>(16); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); return new DefaultKafkaConsumerFactory<>(configProps); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); return factory; } @Bean public ConsumerFactory<String, String> consumerFactoryPic() { String bootstrapServers = bootstrapServerPic; Map<String, Object> configProps = new HashMap<>(16); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdPic); configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommitPic); return new DefaultKafkaConsumerFactory<>(configProps); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryPic() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactoryPic()); factory.setConcurrency(concurrencyPic); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); return factory; } }
@KafkaListener(topics = "xxxxx", containerFactory = "kafkaListenerContainerFactoryPic") public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) { try { String jsonString = message.value(); if (StringUtils.isNoneBlank(jsonString)) { log.info("消費:{}",jsonString); //TODO .... } } catch (Exception e) { log.error(" receive topic error ", e); } finally { ack.acknowledge(); } } @KafkaListener(topics = "xxxxxx", containerFactory = "kafkaListenerContainerFactory") public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) { try { if (StringUtils.isNoneBlank(message.value())) { //TODO .... } } catch (Exception e) { logger.error(" receive topic error ", e); } finally { ack.acknowledge(); } }
關于“springboot項目配置多個kafka的示例代碼”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“springboot項目配置多個kafka的示例代碼”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。