亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

java中kafka怎么使用

小億
143
2023-09-13 20:21:07
欄目: 編程語言

在Java中使用Kafka,首先需要添加Kafka的依賴項。你可以在Maven的pom.xml文件中添加以下依賴項:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>

接下來,你可以使用Kafka的Java客戶端來編寫代碼。以下是一個簡單的示例,演示了如何使用Java發送和接收消息:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
public class KafkaExample {
private static final String TOPIC = "my-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "my-group";
public static void main(String[] args) {
// 創建生產者
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(producerProps);
// 發送消息
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "key", "value");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.printf("Sent record to topic=%s, partition=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
});
// 關閉生產者
producer.close();
// 創建消費者
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
// 訂閱主題并消費消息
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received record: key=%s, value=%s%n", record.key(), record.value());
}
}
}
}

在上面的示例中,我們首先創建了一個生產者,并使用ProducerConfig類的常量來配置生產者的屬性,例如Kafka集群的地址、鍵和值的序列化方式等。然后,我們創建了一個ProducerRecord對象,指定要發送的主題、鍵和值。我們調用生產者的send()方法來發送消息,并通過Callback來處理發送結果。最后,我們關閉了生產者。

然后,我們創建了一個消費者,并使用ConsumerConfig類的常量來配置消費者的屬性,例如Kafka集群的地址、鍵和值的反序列化方式、消費者組等。我們訂閱了一個主題,并在一個無限循環中調用poll()方法來獲取消息。我們遍歷消息并進行處理。

請注意,這只是一個簡單的示例,用于演示如何使用Java操作Kafka。在實際應用中,你可能需要更復雜的邏輯來處理消息,并使用更多的配置選項來優化性能和確保可靠性。

0
留坝县| 普陀区| 丰原市| 江达县| 陆丰市| 铜川市| 德清县| 邵阳县| 黔西县| 仲巴县| 涿州市| 西和县| 昌宁县| 泗水县| 阿拉善盟| 油尖旺区| 松滋市| 安平县| 内丘县| 海阳市| 台中市| 九台市| 澄迈县| 张家港市| 桂林市| 岳池县| 三穗县| 柞水县| 红河县| 壤塘县| 咸宁市| 宁津县| 清流县| 清徐县| 石景山区| 乌鲁木齐市| 博客| 澄城县| 汕尾市| 朝阳县| 临沧市|