要消費Kafka數據并將其寫入數據庫,可以按照以下步驟進行操作:
pip install kafka-python
from kafka import KafkaConsumer
import json
import pymysql
consumer = KafkaConsumer('<topic_name>', bootstrap_servers='<kafka_server_address>')
conn = pymysql.connect(host='<db_host>', port=<db_port>, user='<db_user>', password='<db_password>', db='<db_name>')
cursor = conn.cursor()
for message in consumer:
# 解析JSON格式的消息
data = json.loads(message.value)
# 提取所需的數據字段
field1 = data['field1']
field2 = data['field2']
# ...
# 構造插入數據庫的SQL語句
sql = "INSERT INTO <table_name> (field1, field2) VALUES (%s, %s)"
values = (field1, field2)
# 執行SQL語句
cursor.execute(sql, values)
conn.commit()
cursor.close()
conn.close()
consumer.close()
以上是一個簡單的示例,根據實際情況可能需要根據需要進行一些調整,如處理消息的格式、解析更多字段等。