要向Kafka發送消息,你需要使用Kafka的生產者API。以下是一個簡單的示例代碼,展示了如何使用Java向Kafka發送消息:
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// Kafka 服務器的地址和端口
String bootstrapServer = "localhost:9092";
// 消息的主題
String topic = "test-topic";
// 設置生產者的配置屬性
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServer);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 創建 Kafka 生產者實例
Producer<String, String> producer = new KafkaProducer<>(props);
try {
// 發送一條消息到 Kafka
String message = "Hello, Kafka!";
producer.send(new ProducerRecord<>(topic, message));
System.out.println("消息發送成功: " + message);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 關閉 Kafka 生產者
producer.close();
}
}
}
在上面的示例中,我們首先設置了Kafka服務器地址和端口,以及要發送消息的主題。然后,我們創建了一個包含必要配置屬性的Properties
對象。接下來,我們使用這些配置屬性創建了一個Kafka生產者實例。最后,我們使用send
方法向Kafka發送一條消息,并在控制臺上打印出成功發送的消息。最后,我們關閉了Kafka生產者。
你可以根據自己的需求進行修改和擴展這個示例代碼。