亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

python如何實現消費kafka數據批量插入到es

發布時間:2021-08-05 15:13:40 來源:億速云 閱讀:292 作者:小新 欄目:開發技術

這篇文章將為大家詳細講解有關python如何實現消費kafka數據批量插入到es,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

1、es的批量插入

這是為了方便后期配置的更改,把配置信息放在logging.conf中

用elasticsearch來實現批量操作,先安裝依賴包,sudo pip install Elasticsearch3

from elasticsearch import Elasticsearch 
class ImportEsData:

  logging.config.fileConfig("logging.conf")
  logger = logging.getLogger("msg")

  def __init__(self,hosts,index,type):
    self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
    self.index = index
    self.type = type


  def set_date(self,data): 
    # 批量處理 
    # es.index(index="test-index",doc_type="test-type",id=42,body={"any":"data","timestamp":datetime.now()})
    self.es.index(index=self.index,doc_type=self.index,body=data)

2、使用pykafka消費kafka

1.因為kafka是0.8,pykafka不支持zk,只能用get_simple_consumer來實現

2.為了實現多個應用同時消費而且不重消費,所以一個應用消費一個partition

3. 為是確保消費數據量在不滿足10000這個批量值,能在一個時間范圍內插入到es中,這里設置consumer_timeout_ms一個超時等待時間,退出等待消費阻塞。

4.退出等待消費阻塞后導致無法再消費數據,因此在獲取self.consumer 的外層加入了while True 一個死循環

#!/usr/bin/python
# -*- coding: UTF-8 -*-
from pykafka import KafkaClient
import logging
import logging.config
from ConfigUtil import ConfigUtil
import datetime


class KafkaPython:
  logging.config.fileConfig("logging.conf")
  logger = logging.getLogger("msg")
  logger_data = logging.getLogger("data")

  def __init__(self):
    self.server = ConfigUtil().get("kafka","kafka_server")
    self.topic = ConfigUtil().get("kafka","topic")
    self.group = ConfigUtil().get("kafka","group")
    self.partition_id = int(ConfigUtil().get("kafka","partition"))
    self.consumer_timeout_ms = int(ConfigUtil().get("kafka","consumer_timeout_ms"))
    self.consumer = None
    self.hosts = ConfigUtil().get("es","hosts")
    self.index_name = ConfigUtil().get("es","index_name")
    self.type_name = ConfigUtil().get("es","type_name")


  def getConnect(self):
    client = KafkaClient(self.server)
    topic = client.topics[self.topic]
    p = topic.partitions
    ps={p.get(self.partition_id)}

    self.consumer = topic.get_simple_consumer(
      consumer_group=self.group,
      auto_commit_enable=True,
      consumer_timeout_ms=self.consumer_timeout_ms,
      # num_consumer_fetchers=1,
      # consumer_id='test1',
      partitions=ps
      )
    self.starttime = datetime.datetime.now()


  def beginConsumer(self):
    print("beginConsumer kafka-python")
    imprtEsData = ImportEsData(self.hosts,self.index_name,self.type_name)
    #創建ACTIONS 
    count = 0
    ACTIONS = [] 

    while True:
      endtime = datetime.datetime.now()
      print (endtime - self.starttime).seconds
      for message in self.consumer:
        if message is not None:
          try:
            count = count + 1
            # print(str(message.partition.id)+","+str(message.offset)+","+str(count))
            # self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count))
            action = { 
              "_index": self.index_name, 
              "_type": self.type_name, 
              "_source": message.value
            }
            ACTIONS.append(action)
            if len(ACTIONS) >= 10000:
              imprtEsData.set_date(ACTIONS)
              ACTIONS = []
              self.consumer.commit_offsets()
              endtime = datetime.datetime.now()
              print (endtime - self.starttime).seconds
              #break
          except (Exception) as e:
            # self.consumer.commit_offsets()
            print(e)
            self.logger.error(e)
            self.logger.error(str(message.partition.id)+","+str(message.offset)+","+message.value+"\n")
            # self.logger_data.error(message.value+"\n")
          # self.consumer.commit_offsets()


      if len(ACTIONS) > 0:
        self.logger.info("等待時間超過,consumer_timeout_ms,把集合數據插入es")
        imprtEsData.set_date(ACTIONS)
        ACTIONS = []
        self.consumer.commit_offsets()




  def disConnect(self):
    self.consumer.close()


from elasticsearch import Elasticsearch 
from elasticsearch.helpers import bulk
class ImportEsData:

  logging.config.fileConfig("logging.conf")
  logger = logging.getLogger("msg")

  def __init__(self,hosts,index,type):
    self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
    self.index = index
    self.type = type


  def set_date(self,data): 
    # 批量處理 
    success = bulk(self.es, data, index=self.index, raise_on_error=True) 
    self.logger.info(success)

3、運行

if __name__ == '__main__':
  kp = KafkaPython()
  kp.getConnect()
  kp.beginConsumer()
  # kp.disConnect()

注:簡單的寫了一個從kafka中讀取數據到一個list里,當數據達到一個閾值時,在批量插入到 es的插件

現在還在批量的壓測中。。。

關于“python如何實現消費kafka數據批量插入到es”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

茂名市| 仙桃市| 开原市| 弋阳县| 乐都县| 玉屏| 平凉市| 疏附县| 四川省| 蓝山县| 建湖县| 湟中县| 马鞍山市| 社会| 鱼台县| 磐安县| 大英县| 喀喇| 柘荣县| 吴旗县| 江源县| 安阳市| 长顺县| 高清| 会宁县| 商丘市| 商河县| 中牟县| 临安市| 定州市| 环江| 潼关县| 文山县| 南江县| 登封市| 仪征市| 宁蒗| 桂东县| 屏东县| 兴义市| 磴口县|