亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

EMR-Kafka中怎么利用Connect實現數據遷移

發布時間:2021-07-30 16:51:32 來源:億速云 閱讀:170 作者:Leah 欄目:大數據

本篇文章給大家分享的是有關EMR-Kafka中怎么利用Connect實現數據遷移,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

1.背景

流式處理中經常會遇到Kafka與其他系統進行數據同步或者Kafka集群間數據遷移的情景。使用EMR Kafka Connect可以方便快速的實現數據同步或者數據遷移。

Kafka Connect是一種可擴展的、可靠的,用于在Kafka和其他系統之間快速地進行流式數據傳輸的工具。例如可以使用Kafka Connect獲取數據庫的binglog數據,將數據庫的數據遷入Kafka集群,以同步數據庫的數據,或者對接下游的流式處理系統。同時,Kafka Connect提供的REST API接口可以方便的進行Kafka Connect的創建和管理。
Kafka Connect分為standalone和distributed兩種運行模式。standalone模式下,所有的worker都在一個進程中運行;相比之下,distributed模式更具擴展性和容錯性,是最常用的方式,也是生產環境推薦使用的模式。

本文介紹使用EMR Kafka Connect的REST API接口在Kafka集群間進行數據遷移,使用distributed模式。

2.環境準備

創建兩個EMR集群,集群類型為Kafka。EMR Kafka Connect安裝在task節點上,進行數據遷移的目的Kafka集群需要創建task節點。集群創建好后,task節點上EMR Kafka Connect服務會默認啟動,端口號為8083。

注意要保證兩個集群的網路互通,詳細的創建流程見創建集群https://help.aliyun.com/document_detail/28088.html

3.數據遷移

3.1準備工作

EMR Kafka Connect的配置文件路徑為/etc/ecm/kafka-conf/connect-distributed.properties

在源Kafka集群創建需要同步的topic,例如

EMR-Kafka中怎么利用Connect實現數據遷移

另外,Kafka Connect會將offsets, configs和任務狀態保存在topic中,topic名對應配置文件中的offset.storage.topic、config.storage.topic 和status.storage.topic三個配置項。默認的,Kafka Connect會自動的使用默認的partition和replication factor創建這三個topic。

3.2創建Kafka Connect

在目的Kafka集群的task節點(例如emr-worker-3節點),使用curl命令通過json數據創建一個Kafka Connect。

curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-test", "config": { "connector.class": "EMRReplicatorSourceConnector", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "src.kafka.bootstrap.servers": "${src-kafka-ip}:9092", "src.zookeeper.connect": "${src-kafka-curator-ip}:2181", "dest.zookeeper.connect": "${dest-kafka-curator-ip}:2181", "topic.whitelist": "${source-topic}", "topic.rename.format": "${dest-topic}", "src.kafka.max.poll.records": "300" } }' http://emr-worker-3:8083/connectors

json數據中,name字段代表創建的connect的名稱,此處為connect-test;config字段需要根據實際情況進行配置,其中的變量說明如下表

字段說明
topic.whitelist源Kafka集群中需要同步的topic,多個topic用逗號隔開,例如connect
topic.rename.format可選配置項,目的Kafka集群中同步后的topic,默認值為${topic.whitelist}.replica。例如源topic為connect,同步后的topic為connect.replica
src.kafka.bootstrap.servers源Kafka集群broker地址
src.zookeeper.connect源Kafka集群安裝了zookeeper服務的節點內網IP
dest.zookeeper.connect目的Kafka集群安裝了zookeeper服務的節點內網IP

3.3查看Kafka Connect

查看所有的Kafka Connect

EMR-Kafka中怎么利用Connect實現數據遷移


查看創建的connect-test的狀態

EMR-Kafka中怎么利用Connect實現數據遷移

查看創建的connect-test的狀態查看task的信息

EMR-Kafka中怎么利用Connect實現數據遷移

3.4數據同步

在源Kafka集群創建需要同步的數據。

EMR-Kafka中怎么利用Connect實現數據遷移

3.5查看同步結果

在目的Kafka集群消費同步的數據。

EMR-Kafka中怎么利用Connect實現數據遷移

可以看到,在源Kafka集群發送的100000條數據已經遷移到了目的Kafka集群。

以上就是EMR-Kafka中怎么利用Connect實現數據遷移,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

监利县| 伊川县| 循化| 巩留县| 德惠市| 丰原市| 义乌市| 汉沽区| 呈贡县| 济阳县| 太原市| 东安县| 铁力市| 赤峰市| 铜鼓县| 上林县| 石屏县| 徐州市| 建德市| 林周县| 杭锦后旗| 泸水县| 台南县| 社会| 扎赉特旗| 黄浦区| 荔波县| 白水县| 昌宁县| 从化市| 恩施市| 沾益县| 蒙城县| 大港区| 临武县| 朝阳县| 瑞丽市| 抚松县| 咸阳市| 武清区| 河源市|