要消費Kafka最新數據,可以使用kafka-python
庫。下面是一個簡單的示例代碼:
from kafka import KafkaConsumer
# 配置Kafka連接信息
bootstrap_servers = 'localhost:9092' # Kafka服務器地址和端口號
topic = 'my_topic' # Kafka主題名稱
group_id = 'my_group' # 消費者組ID
# 創建KafkaConsumer對象
consumer = KafkaConsumer(topic,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
auto_offset_reset='latest')
# 消費最新數據
for message in consumer:
print(message.value)
在上面的代碼中,首先需要配置Kafka的連接信息,包括Kafka服務器地址和端口號、主題名稱和消費者組ID。然后,創建一個KafkaConsumer
對象,并指定要消費的主題、Kafka服務器地址、消費者組ID和auto_offset_reset
參數設置為'latest'
,表示從最新的偏移量開始消費。
最后,通過for
循環遍歷消費者對象,打印每條消息的值。這樣就可以消費Kafka的最新數據了。