您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關如何進行Canal binlog日志的Dump流程分析,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
Canal 的 dump 支持串行和并行模式兩種模式,本篇重點梳理 dump 的核心流程,以便對 dump 過程有一個充分的了解,更好的理解 Canal 的實現原理與細節,下一篇中將重點關注Canal是如何引入并行模式來提高dump的性能,即并行編程相關的技巧。
從前面的文章我們得知 Canal binlog 日志解析的基本流程如下圖所示:
在 Canal 中 dump 方法聲明如下:
String binlogfilename
binlog 文件名稱,例如 mysql-bin.000038。
Long binlogPosition
在文件中的偏移量。
SinkFunction func
每解析出一條binlog日志的處理函數。
接下來我們直奔主題,一起來看一下 MysqlConnection 關于 dump 的實現流程。
set wait_timeout=9999999
連接空閑超時時間,默認為8消息,用于 Canal Slave 的等待超時時間遠大于默認值。
set net_write_timeout=1800
網絡寫請求超時時間,針對正在進行數據讀寫的連接,該值默認為 60s。
set net_read_timeout=1800
網絡讀請求超時時間,針對正在進行數據讀寫的連接,該值默認為 30s。
set names 'binary'
設置服務端返回結果時不做編碼轉化,直接按照數據庫的二進制編碼進行發送,由客戶端自己根據需求進行編碼轉化。
set @master_binlog_checksum= @@global.binlog_checksum
設置master_binlog_checksum,因為在mysql5.6之后為binlog引入了checksum機制,例如crc32,canal作為mysql slave,需要與服務端相關參數保持一致。
set @slave_uuid=uuid()
canal相對與mysql數據庫服務而言就是一個從服務器,這個指令用于設置server_id,使用uuid,避免server_id重復。
SET @master_heartbeat_period=15
設置客戶端與服務端心跳發送間隔,默認為15s。
Step2:從主庫查詢binlog checksum,具體向主庫發送 select @@global.binlog_checksum 語句。
Step3:向MySQL Master 注冊從節點,告知客戶端的host、port、用戶名與密碼、serverId,具體實現是發送命令CODE為 0x15。
Step4:向 MySQL Master 發送 dump 請求,MySQL是基于請求與應答模式,發送請求命令后,就會向網絡通道中寫入響應請求。(在這里大家不妨先大概思考一下如何讀取 dump 命令的返回值,這部分雖然涉及到網絡相關的知識,我在這邊會稍微簡單提一下)。
Step5:構建 DirectLogFetcher對象,實現基于 socket 的日志拉取服務,并構建 LogDecoder 對象,用于解析 binlog 日志。
Step6:使用 while 循環反復拉取消息,通過通過 LogDecoder 對二進制流進行解析,提取一條完整的binlog事件,交給 SinkFunction 去處理,并且如果開啟了半同步機制,則需要向master發送ACK。既然是while循環,該方法的退出條件還是值得我們關注的:
fetch.fetch()方法返回 false
SinkFunction 的 sink 方法 false,SinkFunction的詳細處理流程將在下文介紹,這里先告知返回false的情況是 binlog 日志解析線程已停止運行。
上面粗略的介紹了 dump 命令的幾個核心關鍵步驟,要想詳細掌握其實現細節,我們必須繼續深入探討如下幾個問題:
DirectLogFetcher 內部工作機制
LogDecoder binlog 日志解析
發送Dump底層網實現思路
LogBuffer
日志buffer,主要定義如下屬性:
byte[] buffer
緩存區中數據容器。
int origin
當前buffer中的讀指針
int limit
當前buffer的最大可讀可寫指針
int position
當前buffer的寫指針。
int semival
是否需要發送ACK(用于半同步)。
LogBuffer封裝了字節相關的操作,不僅定義了上面的屬性,也定義了字節讀取相關眾多API,其截圖如下:
LogFetcher binlog日志抓取抽象類,定義了如下關鍵屬性與抽象方法。
int DEFAULT_INITIAL_CAPACITY
LogBuffer中的初始容量,默認為8K。
float DEFAULT_GROWTH_FACTOR
容量增長因子,默認為 2.0。
int BIN_LOG_HEADER_SIZE
binlog日志條目 header 的長度,固定為4字節。
float factor
增長因子。
public abstract boolean fetch()
抓取binlog日志。
public abstract void close()
關閉 Fetch。
DirectLogFetcher Canal LogFetcher模式實現類,其核心屬性如下:
SocketChannel channel
網絡通道,用于發送dump請求的網絡通道。
boolean issemi = false
是否開啟半同步。
接下來我們重點剖析 DirectLogFetcher 的 fetch 方法,來探究其實現原理。
在研究DirectLogFetcher的fetch方法之前,我們先重點跟蹤一下其內部網絡讀寫方法fetch0方法,該方法是具體與網絡讀寫相關的實現。
int off
從通道中讀取到的內容放入到buffer中的起始位置
int len
期望從通道中讀取的字節長度。
該方法的實現關鍵點如下:
首先先確保接收緩存區有足夠的剩余空間,如果空間不足,則進行擴容。
然后從通道中讀取指定長度的字節。
接下來我們來重點看一下DirectLogFetcher的fetch的實現流程。
從上面的流程來看,DirectLogFetcher#fetch 方法結束后,就將進入到LogDecoder中。經過一次DirectLogFetcher#fetch方法后,即取回一條binlog日志,即二進制流,接下來就根據binlog協議對其解析。本文暫不深入該方法,如果大家想深入數據庫中間件方面,可以作為一個很好的示例,面向MySQL通信協議進行編程。
通過 LogDecoder從中解析一個事件后,會調用SinkFunction的sink方法,如果該方法返回 false,一次dump請求將介紹,接下來我們看一下其sink方法。
首先需要調用EventSink組件將解析出來的數據傳入EventSink。
EventSink組件處理成功后,會提交解析位點。
關于如何進行Canal binlog日志的Dump流程分析就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。