您好,登錄后才能下訂單哦!
作者:張光輝
本文將為大家展示字節跳動公司怎么把Storm從Jstorm遷移到Flink的整個過程以及后續的計劃。你可以借此了解字節跳動公司引入Flink的背景以及Flink集群的構建過程。字節跳動公司是如何兼容以前的Jstorm作業以及基于Flink做一個任務管理平臺的呢?本文將一一為你揭開這些神秘的面紗。
本文內容如下:
引入Flink的背景
Flink集群的構建過程
下面這幅圖展示的是字節跳動公司的業務場景
cdn.xitu.io/2019/4/25/16a5370e3c99fefa?w=942&h=486&f=jpeg&s=102343">
首先,應用層有廣告,也有AB測,也有推送和數據倉庫的一些業務。然后在使用J storm的過程中,增加了一層模板主要應用于storm的計算模型,使用的語言是python。所以說中間相對抽象了一個schema,跑在最下面一層J storm計算引擎的上面。
字節跳動公司有很多J-storm集群,在當時17年7月份的時候,也就是在計劃遷移到Flink之前,J storm集群的規模大概是下圖所示的規模級別,當時已經有5000臺機器左右了。
接下來,介紹下遷移Flink的整個過程。先詳細地介紹一下當時J-Storm是怎么用的。
上面是一個word count的例子:左邊是一個目錄結構,這個目錄結構在resources下面,里面的Spout/Bolt的邏輯都是一些python腳本寫的。然后在最外層還有一個topology_online.yaml配置文件。
這個配置文件是用來干什么的?就是把所有的Spout和Bolt串聯起來構成一個有向無關圖,也就是DAG圖。這就是使用J storm時的整個目錄結構,大部分用戶都是這樣用的。右邊是Spout和Bolt的邏輯,其實是抽象出來了一個函數,就在這里面寫業務方面的函數,然后將tuple_batch也就是上游流下來的數據去做一些計算邏輯。
下面詳細介紹一下配置文件的信息,其實我們有整個拓撲結構拓撲的信息,比如說作業名叫什么,作業需要多少資源,需要多少work數。這里面會有單個的spout和Bolt的配置信息,比如是消費的topic還是一些并發度?
除了這些信息還有整個這個數據流的流轉,比如說spout的輸出,輸出messsage的消息等等。最后還有整個的Spout到Bolt之間的shuffle邏輯。這就是我們之前Jstorm的整個使用方式。最后會把整個目錄結構里面的內容去解析出來,根據配置文件把整個storm的拓撲結構構建出來,然后提交到集群上面去跑。
使用Jstorm集群遇到了什么問題呢?第一個問題,因為我們當時是用使用python寫的代碼,整個集群是沒有內存隔離的,job和work之間是沒有內存限制的。比如說在實際過程中會經常遇到一個用戶,他可能代碼寫的有問題導致一個work可能占了70G內存,把機器的內存占了1/3。第二個問題就是說業務團隊之間沒有擴大管理,預算和審核是無頭緒的。我們當時都是都是跑在一個大集群上面,然后個別業務是單獨跑在一些小集群,但是我們每次都是資源不足,也沒辦法梳理這個預算。
第三個問題就是集群過多,運維平臺化做得不太好,都是靠人來運維的。這個時候集群多了基本上是管不過來的。
第四個問題就是說我們用python寫的代碼,有些性能比較差。但是我們在Storm的基礎上面去推廣這個Java也比較難,因為我們部分同事實際上是不認可Java的,因為他覺得java開發速度太慢了。
我們當時想解決上面的問題,一個思路是把Jstorm放在yarn上面,直接把Jstorm在yarn上面兼容做這一套。后來因為知道阿里在用Flink所以去調研Flink,發現了Flink的一些優勢,所以想嘗試用Flink解決存在的問題。
使用Flink首先第一個問題可以成功解決,因為Flink作業是跑在yarn上面的,這就解決了內存隔離的問題。然后Yarn也是支持隊列的,我們可以根據業務去劃分隊列,這樣我們的擴大預算審核的問題得到解決了。我們也不需要自己運維一個集群了,因為有yarn去管理我們的資源,這樣也節省了運維成員。在此基礎上還可以做一些物理隔離隊列,其實物理隔離隊列現在也遇到了問題。因為物理隔離隊列只是說這個機器隔離了,但是相當于是機柜也沒有隔離網絡帶寬也沒有隔離,所以說即使是物理隔離隊列,現在也遇到比如說和離線作業共用機柜的時候,這個機柜的出口帶寬被打滿的問題。針對這些問題,我們后續可能想在這個離線離線集群上面做QOS這種流量級別的方式來解決這個問題。
Flink實際上是可以兼容Storm的,比如說之前的歷史作業是可以遷移過來的,不需要維護兩套計算引擎。Flink支持一些高優先級的API比如說支持SQL以及窗口等特性包括說checkpoint。我們頭條的業務對exactly-once的需求不是特別的強烈。
以上就是Flink的優勢,于是我們就決定從J storm往Flink去遷移。
在遷移的過程中,第一件事情是先把Flink集群建立起來。一開始肯定都是追求穩定性,比如說把離線的yarn集群隔離開,然后不依賴于HDFS也可以把Hdfs線上的name node, name space隔離出來。然后我們梳理了原來storm上面的作業,哪些作業屬于不同的業務,然后映射到不同的隊列里面去,最后把一些特殊的隊列也隔離開來。這是我們準備這個Fink集群的時候考慮的幾個點。
下面就考慮Flink怎么兼容J storm,然后把它遷移過來。
我們當時Flink用的是1.32版本,因為Flink有Flink-storm這個工程,它能把Storm作業轉化成Flink作業,我們就借鑒這些技術上實現了一個Flink –jstorm。相當于把一個J storm的拓撲結構轉化成了一個Flink job。只做完這件事情是不夠的,因為我們有一系列的外圍工具需要去對齊。比如說之前提交作業的時候是通過一個腳本提交的讓用戶去屏蔽一些其他的參數。使用 flink的話我們同樣也是需要構建這么一個腳本,然后去提交Flink Job,最后停止flink Job。第三點是構建flink job外圍工具,自動注冊報警,比如說消費延遲報警,自動注冊這個Dashboard以及一些log service,所有的這些為外圍工具都要和原來的服務去對齊。
對齊完之后,我們需要構建一個遷移腳本,遷移的過程中最困難的是資源配置這一塊。因為原來Storm用了多少資源,Storm怎么配,這對于遷移的用戶來說,如果是第一次做肯定是不了解這些東西。因此我們寫這么一個腳本,幫用戶生成它Flink集群里面對應的資源使用情況。這些工作做完了之后,我們就開始去遷移。到現在為止,整體遷移完了,還剩下十個左右的作業沒有遷移完。現在集群規模達到了大概是6000多臺。
在遷移的過程中我們有一些其他優化,比如說J storm是能夠支持task和work維度的重啟的,Flink這一塊做得不是特別好。我們在這方面做了一些優化實現了一個single task和single tm粒度的重啟,這樣就解決部分作業因為task重啟導致整個作業全部重啟。
遷移完之后,我們又構建了一個流式管理平臺。這個平臺是為了解決實際過程中遇到了一些問題,比如說整個機群掛了無法確定哪些作業在上面跑著,也通知不到具體的用戶,有些用戶作業都不知道自己提交了哪些作業。我們構建流式作業的時候目標實際上就是和其他的管理平臺是一樣的,比如說我們提供一些界面操作,然后提供一個版本管理,就是為了方便方便用戶升級和回滾的操作,我們還提供了一站式的查問題的工具:把一些用戶需要的信息都聚合在一個頁面上面,防止用戶不斷跳來跳去以及避免不同系統之間的切換。有一些歷史記錄之前不管是跑在yarn上面還是跑到storm上面,我一個作業被別人kill到了,其實我都是不知道的。針對這個問題我們提供了一些歷史操作記錄的一些目標。
設計這個管理平臺的時候,我們考慮到提供這么一個前端管理平臺可能只是針對公司內部的一部分產品,其他的產品也做了自己的一套前端。他們可以用一個模板,根據自己的邏輯去生成一個storm任務。基于此,我們把整個管理平臺抽象了兩層:最上一層實際上相當于一個面向用戶或者說是類似于前端的一個產品。中間這一層實際上是一個類似于提交作業調度任務,這一層只負責提任務,然后停任務,管理生命周期以及因為故障導致作業失敗了,將作業重新拉起來。這是中間層TSS層做的事情。
這樣,我們就可以對接到所有的前端平臺。通過一個RPC進行TSS通信,就把所有的底層的服務和Filnk和Yarn還有HDFS這些交互的底層的邏輯完全屏蔽開來了。
接下來,用戶寫一個作業就比較簡單了,流程如下:
第一步用戶先要生成自己的一個作業模板,我們這邊通過maven提供的腳本架去生成一些作業的schema,這個作業執行完之后,它會把幫你把一些porm文件,還有一些類似于kafkasource這種常規的組件都幫你準備好,然后你直接在這個模板里面填自己的主要邏輯就可以了。因為我們寫Java程序遇到最多的一個問題就是包沖突問題。所以porm文件幫助用戶把一些可能沖突的一些jar包都給以exclude掉,這樣包沖突的概率會越來越小。
我們測試作業基本上是用IDEA或者local模式去測試,也提供了一個腳本去提交作業,通過這個腳本提交到stage環境上面。在提交注冊在平臺上面去注冊這個作業,然后添加一些配置信息。
下面是一個代碼版本管理的界面:
把整個作業提交之后如下圖所示:
提交完一個作業之后,用戶可能想看作業運行的狀態怎么樣,我們通過四種方式去給用戶展示他的作業運行狀態的。
第一個是Flink UI,也就是官方自帶的UI用戶可以去看。第二個是Dashboard,我們展示了作業里面的task維度,QPS以及task之間的網絡buffer,這些重要的信息匯聚到一起創建了一個Dashboard,這樣可能查問題的時候方便一些。第三個是錯誤日志,其實和大家的思路一樣,把一個分布式的日志然后聚合起來,然后寫到ES上面去。第四是做了一個Jobtrace的工具,就是我們把Flink里面常見的一些異常匹配出來,然后直接給用戶一個wiki的使用指南,告訴用戶比如說你的作業OM了需要擴大內存。只要用戶的作業出現了某些問題,我們把已知的所有的異常都會匹配給用戶。
下面是ES的kibana:
這是我們Jobtrace的功能,我們把Flink的這些常見的異常都匹配出來,每一個異常其實對應了一個wiki然后去讓用戶去解決自己的問題。
最后分享下我們的近期規劃,前面的基本做完并且趨于穩定了,但是現在又遇到了一些新的問題。比如資源使用率這個問題,因為用戶提交作業的時候,用戶對資源不是特別敏感就隨意把一個資源提上去了,可能他明明需要兩個CPU,但是他提了四個CPU。我們想通過一個工具能夠監控到他需要多少資源,然后通知yarn去把這個資源給重置了。就是動態調整job資源,自動把資源重置。
第二個問題是優化作業重啟速度。我們這邊好多業務是根據流式計算的指標來監控它業務的穩定性,如果最上游重啟一個作業,底下一群人收到報警說線上出現一些問題了。原因是最上游某一個作業再重啟。我們想把重啟時間間隔去做到最短或者是無縫重啟,這是下一階段需要去探索探索的一個問題。
第四點:Flink SQL也剛上線,可能需要一些精力投入去推廣。
最后一點,我們希望在此抽象出更多的模式作業模型來,因為我們本身是有一些比如說kafka2ES,kafka2hdfs這些需求,能不能把他們抽象成一個schema,然后去對外提供一些服務。
以上就是我本次分享的主要內容,感謝Flink的舉辦者和參與者,感謝我們的同事,因為以上的分享內容是我和我們同事一起做的。
更多資訊請訪問 Apache Flink 中文社區網站
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。