您好,登錄后才能下訂單哦!
本篇內容主要講解“MySQL特定表全量、增量數據同步到消息隊列怎么實現”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“MySQL特定表全量、增量數據同步到消息隊列怎么實現”吧!
既要同步原始全量數據,也要實時同步MySQL特定庫的特定表增量數據,同時對應的修改、刪除也要對應。
數據同步不能有侵入性:不能更改業務程序,并且不能對業務側有太大性能壓力。
應用場景:數據ETL同步、降低業務服務器壓力。
canal是阿里巴巴旗下的一款開源項目,純Java開發。基于數據庫增量日志解析,提供增量數據訂閱&消費,目前主要支持了MySQL(也支持mariaDB)。
工作原理:mysql主備復制實現
從上層來看,復制分成三步:
master將改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events,可以通過show binlog events進行查看);
slave將master的binary log events拷貝到它的中繼日志(relay log);
slave重做中繼日志中的事件,將改變反映它自己的數據。
原理相對比較簡單:
canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議
mysql master收到dump請求,開始推送binary log給slave(也就是canal)
canal解析binary log對象(原始為byte流)
說明:
server代表一個canal運行實例,對應于一個jvm
instance對應于一個數據隊列 (1個server對應1..n個instance)
instance模塊:
eventParser (數據源接入,模擬slave協議和master進行交互,協議解析)
eventSink (Parser和Store鏈接器,進行數據過濾,加工,分發的工作)
eventStore (數據存儲)
metaManager (增量訂閱&消費信息管理器)
1、mysql、kafka環境準備
2、canal下載:wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
3、解壓:tar -zxvf canal.deployer-1.1.3.tar.gz
4、對目錄conf里文件參數配置
對canal.properties配置:
進入conf/example里,對instance.properties配置:
5、啟動:bin/startup.sh
6、日志查看:
1、開發對應的kafka消費者
package org.kafka; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; /** * * Title: KafkaConsumerTest * Description: * kafka消費者 demo * Version:1.0.0 * @author pancm * @date 2018年1月26日 */ public class KafkaConsumerTest implements Runnable { private final KafkaConsumer<String, String> consumer; private ConsumerRecords<String, String> msgList; private final String topic; private static final String GROUPID = "groupA"; public KafkaConsumerTest(String topicName) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.7.193:9092"); props.put("group.id", GROUPID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "latest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<String, String>(props); this.topic = topicName; this.consumer.subscribe(Arrays.asList(topic)); } @Override public void run() { int messageNo = 1; System.out.println("---------開始消費---------"); try { for (; ; ) { msgList = consumer.poll(1000); if (null != msgList && msgList.count() > 0) { for (ConsumerRecord<String, String> record : msgList) { //消費100條就打印 ,但打印的數據不一定是這個規律的 System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset()); // String v = decodeUnicode(record.value()); // System.out.println(v); //當消費了1000條就退出 if (messageNo % 1000 == 0) { break; } messageNo++; } } else { Thread.sleep(11); } } } catch (InterruptedException e) { e.printStackTrace(); } finally { consumer.close(); } } public static void main(String args[]) { KafkaConsumerTest test1 = new KafkaConsumerTest("sample-data"); Thread thread1 = new Thread(test1); thread1.start(); } /* * 中文轉unicode編碼 */ public static String gbEncoding(final String gbString) { char[] utfBytes = gbString.toCharArray(); String unicodeBytes = ""; for (int i = 0; i < utfBytes.length; i++) { String hexB = Integer.toHexString(utfBytes[i]); if (hexB.length() <= 2) { hexB = "00" + hexB; } unicodeBytes = unicodeBytes + "\\u" + hexB; } return unicodeBytes; } /* * unicode編碼轉中文 */ public static String decodeUnicode(final String dataStr) { int start = 0; int end = 0; final StringBuffer buffer = new StringBuffer(); while (start > -1) { end = dataStr.indexOf("\\u", start + 2); String charStr = ""; if (end == -1) { charStr = dataStr.substring(start + 2, dataStr.length()); } else { charStr = dataStr.substring(start + 2, end); } char letter = (char) Integer.parseInt(charStr, 16); // 16進制parse整形字符串。 buffer.append(new Character(letter).toString()); start = end; } return buffer.toString(); } }
2、對表bak1進行增加數據
CREATE TABLE `bak1` ( `vin` varchar(20) NOT NULL, `p1` double DEFAULT NULL, `p2` double DEFAULT NULL, `p3` double DEFAULT NULL, `p4` double DEFAULT NULL, `p5` double DEFAULT NULL, `p6` double DEFAULT NULL, `p7` double DEFAULT NULL, `p8` double DEFAULT NULL, `p9` double DEFAULT NULL, `p0` double DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 show create table bak1; insert into bak1 select '李雷abcv', `p1` , `p2` , `p3` , `p4` , `p5` , `p6` , `p7` , `p8` , `p9` , `p0` from moci limit 10
3、查看輸出結果:
到此,相信大家對“MySQL特定表全量、增量數據同步到消息隊列怎么實現”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。