亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

基于Flink的MQ-Hive實時數據集成如何實現字節跳動

發布時間:2021-12-10 09:16:29 來源:億速云 閱讀:154 作者:小新 欄目:大數據

這篇文章主要介紹基于Flink的MQ-Hive實時數據集成如何實現字節跳動,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

在數據中臺建設過程中,一個典型的數據集成場景是將 MQ (Message Queue,例如 Kafka、RocketMQ 等)的數據導入到 Hive 中,以供下游數倉建設以及指標統計。由于 MQ-Hive 是數倉建設第一層,因此對數據的準確性以及實時性要求比較高。


本文主要圍繞 MQ-Hive 場景,針對目前字節跳動內已有解決方案的痛點,提出基于 Flink 的實時解決方案,并介紹新方案在字節跳動內部的使用現狀。

已有方案及痛點


 

字節跳動內已有解決方案如下圖所示,主要分了兩個步驟:

  1. 通過 Dump 服務將 MQ 的數據寫入到 HDFS 文件

  2. 再通過 Batch ETL 將 HDFS 數據導入到 Hive 中,并添加 Hive 分區

基于Flink的MQ-Hive實時數據集成如何實現字節跳動  

痛點

  1. 任務鏈較長,原始數據需要經過多次轉換最終才能進入 Hive
  2. 實時性比較差,Dump Service、Batch ETL 延遲都會導致最終數據產出延遲
  3. 存儲、計算開銷大,MQ 數據重復存儲和計算
  4. 基于原生 Java 打造,數據流量持續增長后,存在單點故障和機器負載不均衡等問題
  5. 運維成本較高,架構上無法復用公司內 Hadoop/Flink/Yarn 等現有基礎設施
  6. 不支持異地容災

基于 Flink 實時解決方案

優勢

針對目前公司傳統解決方案的痛點,我們提出基于 Flink 的實時解決方案,將 MQ 的數據實時寫入到 Hive,并支持事件時間以及 Exactly Once 語義。相比老方案,新方案優勢如下所示:

  1. 基于流式引擎 Flink 開發,支持 Exactly Once 語義
  2. 實時性更高,MQ 數據直接進入 Hive,無中間計算環節
  3. 減少中間存儲,整個流程數據只會落地一次
  4. 支撐 Yarn 部署模式,方便用戶遷移
  5. 資源管理彈性,方便擴容以及運維
  6. 支持雙機房容災

整體架構

整體架構如下圖所示,主要包括 DTS(Data Transmission Service) Source、DTS Core、DTS Sink 三大模塊,具體功能如下:

  1. DTS Source 接入不同 MQ 數據源,支持 Kafka、RocketMQ 等
  2. DTS Sink 將數據輸出到目標數據源,支持 HDFS、Hive 等
  3. DTS Core 貫穿整個數據同步流程,通過 Source 讀取源端數據,經過 DTS Framework 處理,最后通過 Sink 將數據輸出到目標端。
  4. DTS Framework 集成類型系統、文件切分、Exactly Once、任務信息采集、事件時間、臟數據收集等核心功能
  5. 支持 Yarn 部署模式,資源調度、管理比較彈性

基于Flink的MQ-Hive實時數據集成如何實現字節跳動  
DTS Dump架構圖  

Exactly Once

Flink 框架通過 Checkpoint 機制,能夠提供 Exactly Once 或者 At Least Once 語義。為了實現 MQ-Hive 全鏈路支持 Exactly-once 語義,還需要 MQ Source、Hive Sink 端支持 Exactly Once 語義。本文通過 Checkpoint + 2PC 協議實現,具體過程如下:

  1. 數據寫入時,Source 端從上游 MQ 拉取數據并發送到 Sink 端;Sink 端將數據寫入到臨時目錄中
  2. Checkpoint Snapshot 階段,Source 端將 MQ Offset 保存到 State 中;Sink 端關閉寫入的文件句柄,并保存當前 Checkpoint ID 到 State 中;
  3. Checkpoint Complete 階段,Source 端 Commit MQ Offset;Sink 端將臨時目錄中的數據移動到正式目錄下
  4. Checkpoint Recover 階段,加載最新一次成功的 Checkpoint 目錄并恢復 State 信息,其中 Source 端將 State 中保存的 MQ Offset 作為起始位置;Sink 端恢復最新一次成功的 Checkpoint ID,并將臨時目錄的數據移動到正式目錄下

■ 實現優化

在實際使用場景中,特別是大并發場景下,HDFS 寫入延遲容易有毛刺,因為個別 Task Snapshot 超時或者失敗,導致整個 Checkpoint 失敗的問題會比較明顯。因此針對 Checkpoint 失敗,提高系統的容錯性以及穩定性就比較重要。

這里充分利用 Checkpoint ID 嚴格單調遞增的特性,每一次做 Checkpoint 時,當前 Checkpoint ID 一定比以前大,因此在 Checkpoint Complete 階段,可以提交小于等于當前 Checkpoint ID 的臨時數據。具體優化策略如下:

  1. Sink 端臨時目錄為{dump_path}/{next_cp_id},這里 next_cp_id 的定義是當前最新的 cp_id + 1
  2. Checkpoint Snapshot 階段,Sink 端保存當前最新 cp_id 到 State,同時更新 next_cp_id 為 cp_id + 1
  3. Checkpoint Complete 階段,Sink 端將臨時目錄中所有小于等于當前 cp_id 的數據移動到正式目錄下
  4. Checkpoint Recover 階段,Sink 端恢復最新一次成功的 cp_id,并將臨時目錄中小于等于當前 cp_id 的數據移動到正式目錄下

類型系統

由于不同數據源支持的數據類型不一樣,為了解決不同數據源間的數據同步以及不同類型轉換兼容的問題,我們支持了 DTS 類型系統,DTS 類型可細化為基礎類型和復合類型,其中復合類型支持類型嵌套,具體轉換流程如下:

  1. 在 Source 端,將源數據類型,統一轉成系統內部的 DTS 類型
  2. 在 Sink 端,將系統內部的 DTS 類型轉換成目標數據源類型
  3. 其中 DTS 類型系統支持不同類型間的相互轉換,比如 String 類型與 Date 類型的相互轉換

基于Flink的MQ-Hive實時數據集成如何實現字節跳動  
DTS Dump架構圖  

Rolling Policy

Sink 端是并發寫入,每個 Task 處理的流量不一樣,為了避免生成太多的小文件或者生成的文件過大,需要支持自定義文件切分策略,以控制單個文件的大小。目前支持三種文件切分策略:文件大小、文件最長未更新時間、Checkpoint。

■ 優化策略

Hive 支持 Parquet、Orc、Text 等多種存儲格式,不同的存儲格式數據寫入過程不太一樣,具體可以分為兩大類:

  1. RowFormat:基于單條寫入,支持按照 Offset 進行 HDFS Truncate 操作,例如 Text 格式
  2. BulkFormat:基于 Block 寫入,不支持 HDFS Truncate 操作,例如 Parquet、ORC 格式

為了保障 Exactly Once 語義,并同時支持 Parquet、Orc、Text 等多種格式,在每次 Checkpoint 時,強制做文件切分,保證所有寫入的文件都是完整的,Checkpoint 恢復時不用做 Truncate 操作。

容錯處理

理想情況下流式任務會一直運行不需要重啟,但實際不可避免會遇到以下幾個場景:

  1. Flink 計算引擎升級,需要重啟任務
  2. 上游數據增加,需要調整任務并發度
  3. Task Failover

■ 并發度調整

目前 Flink 原生支持 State Rescale。具體實現中,在 Task 做 Checkpoint Snapshot 時,將 MQ Offset 保存到 ListState 中;Job 重啟后,Job Master 會根據 Operator 并發度,將 ListState 平均分配到各個 Task 上。

■ Task Failover

由于網絡抖動、寫入超時等外部因素的影響,Task 不可避免會出現寫入失敗,如何快速、準確的做 Task Failover 就顯得比較重要。目前 Flink 原生支持多種 Task Failover 策略,本文使用 Region Failover 策略,將失敗 Task 所在 Region 的所有 Task 都重啟。

異地容災


■ 背景


大數據時代,數據的準確性和實時性顯得尤為重要。本文提供多機房部署及異地容災解決方案,當主機房因為斷網、斷電、地震、火災等原因暫時無法對外提供服務時,能快速將服務切換到備災機房,并同時保障 Exactly Once 語義。

■ 容災組件


整體解決方案需要多個容災組件一起配合實現,容災組件如下圖所示,主要包括 MQ、YARN、HDFS,具體如下:

  1. MQ 需要支持多機房部署,當主機房故障時,能將 Leader 切換到備機房,以供下游消費
  2. Yarn 集群在主機房、備機房都有部署,以便 Flink Job 遷移
  3. 下游 HDFS 需要支持多機房部署,當主機房故障時,能將 Master 切換到備機房
  4. Flink Job 運行在 Yarn 上,同時任務 State Backend 保存到 HDFS,通過 HDFS 的多機房支持保障 State Backend 的多機房

 

  基于Flink的MQ-Hive實時數據集成如何實現字節跳動  

 

■ 容災過程

整體容災過程如下所示:

  1. 正常情況下,MQ Leader 以及 HDFS Master 部署在主機房,并將數據同步到備機房。同時 Flink Job 運行在主機房,并將任務 State 寫入到 HDFS 中,注意 State 也是多機房部署模式
  2. 災難情況下,MQ Leader 以及 HDFS Master 從主機房遷移到備災機房,同時 Flink Job 也遷移到備災機房,并通過 State 恢復災難前的 Offset 信息,以提供 Exactly Once 語義

基于Flink的MQ-Hive實時數據集成如何實現字節跳動  
基于Flink的MQ-Hive實時數據集成如何實現字節跳動  

事件時間歸檔

■ 背景

在數倉建設中,處理時間(Process Time)和事件時間(Event Time)的處理邏輯不太一樣,對于處理時間會將數據寫到當前系統時間所對應的時間分區下;對于事件時間,則是根據數據的生產時間將數據寫到對應時間分區下,本文也簡稱為歸檔。

在實際場景中,不可避免會遇到各種上下游故障,并在持續一段時間后恢復,如果采用 Process Time 的處理策略,則事故期間的數據會寫入到恢復后的時間分區下,最終導致分區空洞或者數據漂移的問題;如果采用歸檔的策略,會按照事件時間寫入,則沒有此類問題。

由于上游數據事件時間會存在亂序,同時 Hive 分區生成后就不應該再繼續寫入,因此實際寫入過程中不可能做到無限歸檔,只能在一定時間范圍內歸檔。歸檔的難點在于如何確定全局最小歸檔時間以及如何容忍一定的亂序。

■  全局最小歸檔時間


Source 端是并發讀取,并且一個 Task 可能同時讀取多個 MQ Partition 的數據,對于 MQ 的每一個 Parititon 會保存當前分區歸檔時間,取分區中最小值作為 Task 的最小歸檔時間,最終取 Task 中最小值,作為全局最小歸檔時間。

基于Flink的MQ-Hive實時數據集成如何實現字節跳動  

■ 亂序處理

為了支持亂序的場景,會支持一個歸檔區間的設置,其中 Global Min Watermark 為全局最小歸檔時間,Partition Watermark 為分區當前歸檔時間,Partition Min Watermark 為分區最小歸檔時間,只有當事件時間滿足以下條件時,才會進行歸檔:

  1. 事件時間大于全局最小歸檔時間
  2. 事件時間大于分區最小歸檔時間

 
基于Flink的MQ-Hive實時數據集成如何實現字節跳動  

 

Hive 分區生成

■ 原理

Hive 分區生成的難點在于如何確定分區的數據是否就緒以及如何添加分區。由于 Sink 端是并發寫入,同時會有多個 Task 寫同一個分區數據,因此只有當所有 Task 分區數據寫入完成,才能認為分區數據是就緒,本文解決思路如下:

  1. 在 Sink 端,對于每個 Task 保存當前最小處理時間,需要滿足單調遞增的特性
  2. 在 Checkpoint Complete 時,Task 上報最小處理時間到 JM 端
  3. JM 拿到所有 Task 的最小處理時間后,可以得到全局最小處理時間,并以此作為 Hive 分區的最小就緒時間
  4. 當最小就緒時間更新時,可判斷是否添加 Hive 分區

 
基于Flink的MQ-Hive實時數據集成如何實現字節跳動  

■ 動態分區

動態分區是根據上游輸入數據的值,確定數據寫到哪個分區目錄,而不是寫到固定分區目錄,例如 date={date}/hour={hour}/app={app}的場景,根據分區時間以及 app 字段的值確定最終的分區目錄,以實現每個小時內,相同的 app 數據在同一個分區下。

在靜態分區場景下,每個 Task 每次只會寫入一個分區文件,但在動態分區場景下,每個 Task 可能同時寫入多個分區文件。對于 Parque 格式的寫入,會先將數據寫到做本地緩存,然后批次寫入到 Hive,當 Task 同時處理的文件句柄過多時,容易出現 OOM。為了防止單 Task OOM,會周期性對文件句柄做探活檢測,及時釋放長時間沒有寫入的文件句柄。

基于Flink的MQ-Hive實時數據集成如何實現字節跳動  

Messenger

Messenger 模塊用于采集 Job 運行狀態信息,以便衡量 Job 健康度以及大盤指標建設。

元信息采集

元信息采集的原理如下所示,在 Sink 端通過 Messenger 采集 Task 的核心指標,例如流量、QPS、臟數據、寫入 Latency、事件時間寫入效果等,并通過 Messenger Collector 匯總。其中臟數據需要輸出到外部存儲中,任務運行指標輸出到 Grafana,用于大盤指標展示。

基于Flink的MQ-Hive實時數據集成如何實現字節跳動  

 

■ 臟數據收集

數據集成場景下,不可避免會遇到臟數據,例如類型配置錯誤、字段溢出、類型轉換不兼容等場景。對于流式任務來說,由于任務會一直運行,因此需要能夠實時統計臟數據流量,并且將臟數據保存到外部存儲中以供排查,同時在運行日志中采樣輸出。

■ 大盤監控


大盤指標覆蓋全局指標以及單個 Job 指標,包括寫入成功流量和 QPS、寫入 Latency、寫入失敗流量和 QPS、歸檔效果統計等,具體如下圖所示:

基于Flink的MQ-Hive實時數據集成如何實現字節跳動  
基于Flink的MQ-Hive實時數據集成如何實現字節跳動

以上是“基于Flink的MQ-Hive實時數據集成如何實現字節跳動”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

康平县| 西盟| 噶尔县| 措美县| 昂仁县| 苍溪县| 永年县| 侯马市| 苍梧县| 伊吾县| 贵溪市| 游戏| 乐昌市| 曲阳县| 淳化县| 赣榆县| 共和县| 翼城县| 灵寿县| 诸城市| 禹州市| 万山特区| 五台县| 江门市| 临澧县| 彭泽县| 青海省| 武鸣县| 翁牛特旗| 张家口市| 松溪县| 曲阳县| 思茅市| 阳谷县| 双江| 怀仁县| 桂林市| 仁怀市| 北川| 兴业县| 美姑县|