要使用Scala與Kafka構建實時數據管道,您可以按照以下步驟操作:
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.0.0"
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
// Kafka 生產者
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val topic = "test"
val key = "key1"
val value = "value1"
val record = new ProducerRecord[String, String](topic, key, value)
producer.send(record)
producer.close()
// Kafka 消費者
val consumerProps = new Properties()
consumerProps.put("bootstrap.servers", "localhost:9092")
consumerProps.put("group.id", "test-group")
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](consumerProps)
val topics = List("test")
consumer.subscribe(topics)
while (true) {
val records = consumer.poll(100)
for (record <- records.asScala) {
println(record.key() + ": " + record.value())
}
}
通過以上步驟,您可以使用Scala與Kafka構建實時數據管道。希望以上信息對您有所幫助。