亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

python如何實現對kafka的基本操作

發布時間:2021-10-18 15:25:40 來源:億速云 閱讀:215 作者:小新 欄目:編程語言

這篇文章主要為大家展示了“python如何實現對kafka的基本操作”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“python如何實現對kafka的基本操作”這篇文章吧。

-- coding:utf-8 --

from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

bootstrap_servers = []
class OperateKafka:
def init(self,bootstrap_servers,topic):
self.bootstrap_servers = bootstrap_servers
self.topic = topic

"""生產者"""
def produce(self):
    producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers)
    for i in range(4):
        msg = "msg%d" %i
        producer.send(self.topic,key=str(i),value=msg)
    producer.close()

"""一個消費者消費一個topic"""
def consume(self):
    #consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers)
    consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers)
    print consumer.partitions_for_topic(self.topic)  #獲取test主題的分區信息
print consumer.topics()  #獲取主題列表
print consumer.subscription()  #獲取當前消費者訂閱的主題
print consumer.assignment()  #獲取當前消費者topic、分區信息
print consumer.beginning_offsets(consumer.assignment()) #獲取當前消費者可消費的偏移量
consumer.seek(TopicPartition(topic=self.topic, partition=0), 1)  #重置偏移量,從第1個偏移量消費
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" 
        % (message.topic,message.partition,message.offset, message.key,message.value))

"""一個消費者訂閱多個topic """
def consume2(self):
    consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))  #訂閱要消費的主題
print consumer.topics()
print consumer.position(TopicPartition(topic='TEST', partition=0)) #獲取當前主題的最新偏移量
for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                      message.offset, message.key,
                                      message.value))
"""消費者(手動拉取消息)"""
def consume3(self):
    consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))
while True:
        message = consumer.poll(timeout_ms=5)   #從kafka獲取消息
        if message:
        print message
        time.sleep(1)

def main():
bootstrap_servers = ['192.168.124.201:9092']
topic = "TEST"
operateKafka = OperateKafka(bootstrap_servers,topic)
operateKafka.produce()
#operateKafka.consume()
#operateKafka.consume2()
operateKafka.consume3()
main()

以上是“python如何實現對kafka的基本操作”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

鸡西市| 黑龙江省| 高碑店市| 衡东县| 泗洪县| 牙克石市| 比如县| 南宁市| 新巴尔虎右旗| 马公市| 庆云县| 正安县| 同心县| 股票| 永清县| 曲沃县| 淮北市| 仙居县| 黑河市| 白玉县| 梅州市| 万全县| 格尔木市| 玉林市| 宜丰县| 牙克石市| 平昌县| 海盐县| 霸州市| 聊城市| 天峻县| 建湖县| 乡城县| 健康| 峡江县| 长宁区| 定安县| 日喀则市| 长乐市| 卫辉市| 博兴县|