亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Kafka Connect及FileConnector的示例分析

發布時間:2021-12-15 09:19:52 來源:億速云 閱讀:211 作者:柒染 欄目:大數據

這篇文章將為大家詳細講解有關Kafka Connect及FileConnector的示例分析,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

一. Kafka Connect簡介

Kafka是一個使用越來越廣的消息系統,尤其是在大數據開發中(實時數據處理和分析)。為何集成其他系統和解耦應用,經常使用Producer來發送消息到Broker,并使用Consumer來消費Broker中的消息。Kafka Connect是到0.9版本才提供的并極大的簡化了其他系統與Kafka的集成。Kafka Connect運用用戶快速定義并實現各種Connector(File,Jdbc,Hdfs等),這些功能讓大批量數據導入/導出Kafka很方便。

Kafka Connect及FileConnector的示例分析

如圖中所示,左側的Sources負責從其他異構系統中讀取數據并導入到Kafka中;右側的Sinks是把Kafka中的數據寫入到其他的系統中。

二. 各種Kafka Connector

Kafka Connector很多,包括開源和商業版本的。如下列表中是常用的開源的Connector

ConnectorsReferences
JdbcSource, Sink
Elastic SearchSink1, Sink2, Sink3
CassandraSource1, Source 2, Sink1, Sink2
MongoDBSource
HBaseSink
SyslogSource
MQTT (Source)Source
Twitter (Source)Source, Sink
S3Sink1, Sink2

商業版的可以通過Confluent.io獲得

三. 示例

3.1 FileConnector Demo

 本例演示如何使用Kafka Connect把Source(test.txt)轉為流數據再寫入到Destination(test.sink.txt)中。如下圖所示:

Kafka Connect及FileConnector的示例分析

本例使用到了兩個Connector:

  • FileStreamSource:從test.txt中讀取并發布到Broker中

  • FileStreamSink:從Broker中讀取數據并寫入到test.sink.txt文件中 其中的Source使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test

其中的Sink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.properties

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

Broker使用到的配置文件是${KAFKA_HOME}/config/connect-standalone.properties

bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
3.2 運行Demo

需要熟悉Kafka的一些命令行,參考本系列之前的文章Apache Kafka系列(二) 命令行工具(CLI)

3.2.1 啟動Kafka Broker
[root@localhost bin]# cd /opt/kafka_2.11-0.11.0.0/
[root@localhost kafka_2.11-0.11.0.0]# ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs
[root@localhost kafka_2.11-0.11.0.0]# ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
[root@localhost kafka_2.11-0.11.0.0]# ./bin/kafka-server-start.sh ./config/server.properties &
3.2.2 啟動Source Connector和Sink Connector
[root@localhost kafka_2.11-0.11.0.0]# ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

3.3.3 打開console-consumer

./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test
3.3.4 寫入到test.txt文件中,并觀察3.3.3中的變化
[root@Server4 kafka_2.12-0.11.0.0]# echo 'firest line' >> test.txt
[root@Server4 kafka_2.12-0.11.0.0]# echo 'second line' >> test.txt
3.3.3中打開的窗口輸出如下
{"schema":{"type":"string","optional":false},"payload":"firest line"}
{"schema":{"type":"string","optional":false},"payload":"second line"}
3.3.5 查看test.sink.txt
[root@Server4 kafka_2.12-0.11.0.0]# cat test.sink.txt
firest line
second line

關于Kafka Connect及FileConnector的示例分析就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

定南县| 合山市| 涪陵区| 固镇县| 邻水| 郴州市| 唐海县| 宁安市| 博白县| 盐山县| 临颍县| 新营市| 内乡县| 永清县| 鲁甸县| 潞城市| 庆云县| 宿迁市| 望江县| 招远市| 沂水县| 遵化市| 通州区| 八宿县| 蕲春县| 锡林浩特市| 宁南县| 三明市| 阿拉善右旗| 个旧市| 闽侯县| 崇信县| 辰溪县| 木里| 新密市| 花莲市| 漠河县| 富顺县| 巴里| 阿图什市| 六枝特区|