您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關Spark Remote Shuffle Service最佳實踐的示例分析,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
經過近半年的上線、運營,趣頭條大數據團隊和阿里云 EMR 團隊共同開發的 RSS 可以完美解決 Spark Shuffle 面臨的技術挑戰,為集群的穩定性和容器化的落地提供強有力的保證,其業務價值主要體現在以下方面:
降本增效效果明顯
SLA顯著提升
作業執行效率顯著提升
架構靈活性顯著提升
業務場景與現狀
當前大數據平臺的挑戰和思考
近半年大數據平臺主要的業務指標是降本增效,一方面業務方希望離線平臺每天能夠承載更多的作業,另一方面我們自身有降本的需求,如何在降本的前提下支撐更多地業務量對于每個技術人都是非常大地挑戰。熟悉Spark的同學應該非常清楚,在大規模集群場景下,Spark Shuffle在實現上有比較大的缺陷,體現在以下的幾個方面:
Spark Shuffle Fetch過程存在大量的網絡小包,現有的External Shuffle Service設計并沒有非常細致的處理這些RPC請求,大規模場景下會有很多connection reset發生,導致FetchFailed,從而導致stage重算。
Spark Shuffle Fetch過程存在大量的隨機讀,大規模高負載集群條件下,磁盤IO負載高、CPU滿載時常發生,極容易發生FetchFailed,從而導致stage重算。
重算過程會放大集群的繁忙程度,搶占機器資源,導致惡性循環嚴重,SLA完不成,需要運維人員手動將作業跑在空閑的Label集群。
計算和Shuffle過程架構不能拆開,不能把Shuffle限定在指定的集群內,不能利用部分SSD機器。
M*N次的shuffle過程:對于10K mapper,5K reducer級別的作業,基本跑不完。
NodeManager和Spark Shuffle Service是同一進程,Shuffle過程太重,經常導致NodeManager重啟,從而影響Yarn調度穩定性。
以上的這些問題對于Spark研發同學是非常痛苦的,好多作業每天運行時長方差會非常大,而且總有一些無法完成的作業,要么業務進行拆分,要么跑到獨有的Yarn集群中。除了現有面臨的挑戰之外,我們也在積極構建下一代基礎架構設施,隨著云原生Kubernetes概念越來越火,Spark社區也提供了Spark on Kubernetes版本,相比較于Yarn來說,Kubernetes能夠更好的利用云原生的彈性,提供更加豐富的運維、部署、隔離等特性。但是Spark on Kubernetes目前還存在很多問題沒有解決,包括容器內的Shuffle方式、動態資源調度、調度性能有限等等。我們針對Kubernetes在趣頭條的落地,主要有以下幾個方面的需求:
實時集群、OLAP集群和Spark集群之前都是相互獨立的,怎樣能夠將這些資源形成統一大數據資源池。通過Kubernetes的天生隔離特性,更好的實現離線業務與實時業務混部,達到降本增效目的。
公司的在線業務都運行在Kubernetes集群中,如何利用在線業務和大數據業務的不同特點進行錯峰調度,達成ECS的總資源量最少。
希望能夠基于Kubernetes來包容在線服務、大數據、AI等基礎架構,做到運維體系統一化。
因為趣頭條的大數據業務目前全都部署在阿里云上,阿里云EMR團隊和趣頭條的大數據團隊進行了深入技術共創,共同研發了Remote Shuffle Service(以下簡稱RSS),旨在解決Spark on Yarn層面提到的所有問題,并為Spark跑在Kubernetes上提供Shuffle基礎組件。
Remote Shuffle Service設計與實現
基于上述背景,我們與阿里云EMR團隊共同開發了Remote Shuffle Service。RSS可以提供以下的能力,完美的解決了Spark Shuffle面臨的技術挑戰,為我們集群的穩定性和容器化的落地提供了強有力的保證,主要體現在以下幾個方面:
高性能服務器的設計思路,不同于Spark原有Shuffle Service,RPC更輕量、通用和穩定。
兩副本機制,能夠保證的Shuffle fetch極小概率(低于0.01%)失敗。
合并shuffle文件,從M*N次shuffle變成N次shuffle,順序讀HDD磁盤會顯著提升shuffle heavy作業性能。
減少Executor計算時內存壓力,避免map過程中Shuffle Spill。
計算與存儲分離架構,可以將Shuffle Service部署到特殊硬件環境中,例如SSD機器,可以保證SLA極高的作業。
完美解決Spark on Kubernetes方案中對于本地磁盤的依賴。
Spark RSS架構包含三個角色: Master, Worker, Client。Master和Worker構成服務端,Client以不侵入的方式集成到Spark ShuffleManager里(RssShuffleManager實現了ShuffleManager接口)。
Master的主要職責是資源分配與狀態管理。
Worker的主要職責是處理和存儲Shuffle數據。
Client的主要職責是緩存和推送Shuffle數據。
整體流程如下所示(其中ResourceManager和MetaService是Master的組件),如圖2。
圖2 RSS整體架構圖
3.2.2 實現流程
下面重點來講一下實現的流程:
RSS采用Push Style的shuffle模式,每個Mapper持有一個按Partition分界的緩存區,Shuffle數據首先寫入緩存區,每當某個Partition的緩存滿了即觸發PushData。
Driver先和Master發生StageStart的請求,Master接受到該RPC后,會分配對應的Worker Partition并返回給Driver,Shuffle Client得到這些元信息后,進行后續的推送數據。
Client開始向主副本推送數據。主副本Worker收到請求后,把數據緩存到本地內存,同時把該請求以Pipeline的方式轉發給從副本,從而實現了2副本機制。
為了不阻塞PushData的請求,Worker收到PushData請求后會以純異步的方式交由專有的線程池異步處理。根據該Data所屬的Partition拷貝到事先分配的buffer里,若buffer滿了則觸發flush。RSS支持多種存儲后端,包括DFS和Local。若后端是DFS,則主從副本只有一方會flush,依靠DFS的雙副本保證容錯;若后端是Local,則主從雙方都會flush。
在所有的Mapper都結束后,Driver會觸發StageEnd請求。Master接收到該RPC后,會向所有Worker發送CommitFiles請求,Worker收到后把屬于該Stage buffer里的數據flush到存儲層,close文件,并釋放buffer。Master收到所有響應后,記錄每個partition對應的文件列表。若CommitFiles請求失敗,則Master標記此Stage為DataLost。
在Reduce階段,reduce task首先向Master請求該Partition對應的文件列表,若返回碼是DataLost,則觸發Stage重算或直接abort作業。若返回正常,則直接讀取文件數據。
總體來講,RSS的設計要點總結為3個層面:
采用PushStyle的方式做shuffle,避免了本地存儲,從而適應了計算存儲分離架構。
按照reduce做聚合,避免了小文件隨機讀寫和小數據量網絡請求。
做了2副本,提高了系統穩定性。
對于RSS系統,容錯性是至關重要的,我們分為以下幾個維度來實現:
PushData失敗
當PushData失敗次數(Worker掛了,網絡繁忙,CPU繁忙等)超過MaxRetry后,Client會給Master發消息請求新的Partition Location,此后本Client都會使用新的Location地址,該階段稱為Revive。
若Revive是因為Client端而非Worker的問題導致,則會產生同一個Partition數據分布在不同Worker上的情況,Master的Meta組件會正確處理這種情形。
若發生WorkerLost,則會導致大量PushData同時失敗,此時會有大量同一Partition的Revive請求打到Master。為了避免給同一個Partition分配過多的Location,Master保證僅有一個Revive請求真正得到處理,其余的請求塞到pending queue里,待Revive處理結束后返回同一個Location。
Worker宕機
當發生WorkerLost時,對于該Worker上的副本數據,Master向其peer發送CommitFile的請求,然后清理peer上的buffer。若Commit Files失敗,則記錄該Stage為DataLost;若成功,則后續的PushData通過Revive機制重新申請Location。
數據去重
Speculation task和task重算會導致數據重復。解決辦法是每個PushData的數據片里編碼了所屬的mapId,attemptId和batchId,并且Master為每個map task記錄成功commit的attemtpId。read端通過attemptId過濾不同的attempt數據,并通過batchId過濾同一個attempt的重復數據。
多副本
RSS目前支持DFS和Local兩種存儲后端。
在DFS模式下,ReadPartition失敗會直接導致Stage重算或abort job。在Local模式,ReadPartition失敗會觸發從peer location讀,若主從都失敗則觸發Stage重算或abort job。
大家可以看到RSS的設計中Master是一個單點,雖然Master的負載很小,不會輕易地掛掉,但是這對于線上穩定性來說無疑是一個風險點。在項目的最初上線階段,我們希望可以通過SubCluster的方式進行workaround,即通過部署多套RSS來承載不同的業務,這樣即使RSS Master宕機,也只會影響有限的一部分業務。但是隨著系統的深入使用,我們決定直面問題,引進高可用Master。主要的實現如下:
首先,Master目前的元數據比較多,我們可以將一部分與ApplD+ShuffleId本身相關的元數據下沉到Driver的ShuffleManager中,由于元數據并不會很多,Driver增加的內存開銷非常有限。
另外,關于全局負載均衡的元數據和調度相關的元數據,我們利用Raft實現了Master組件的高可用,這樣我們通過部署3或5臺Master,真正的實現了大規模可擴展的需求。
實際效果與分析
團隊針對TeraSort,TPC-DS以及大量的內部作業進行了測試,在Reduce階段減少了隨機讀的開銷,任務的穩定性和性能都有了大幅度提升。
圖3是TeraSort的benchmark,以10T Terasort為例,Shuffle量壓縮后大約5.6T。可以看出該量級的作業在RSS場景下,由于Shuffle read變為順序讀,性能會有大幅提升。
圖3 TeraSort性能測試(RSS性能更好)
圖4是一個線上實際脫敏后的Shuffle heavy大作業,之前在混部集群中很小概率可以跑完,每天任務SLA不能按時達成,分析原因主要是由于大量的FetchFailed導致stage進行重算。使用RSS之后每天可以穩定的跑完,2.1T的shuffle也不會出現任何FetchFailed的場景。在更大的數據集性能和SLA表現都更為顯著。
圖4 實際業務的作業stage圖(使用RSS保障穩定性和性能)
在大數據團隊和阿里云EMR團隊的共同努力下,經過近半年的上線、運營RSS,以及和業務部門的長時間測試,業務價值主要體現在以下方面:
降本增效效果明顯,在集群規模小幅下降的基礎上,支撐了更多的計算任務,TCO成本下降20%。
SLA顯著提升,大規模Spark Shuffle任務從跑不完到能跑完,我們能夠將不同SLA級別作業合并到同一集群,減小集群節點數量,達到統一管理,縮小成本的目的。原本業務方有一部分SLA比較高的作業在一個獨有的Yarn集群B中運行,由于主Yarn集群A的負載非常高,如果跑到集群A中,會經常的掛掉。利用RSS之后可以放心的將作業跑到主集群A中,從而釋放掉獨有Yarn集群B。
作業執行效率顯著提升,跑的慢 -> 跑的快。我們比較了幾個典型的Shuffle heavy作業,一個重要的業務線作業原本需要3小時,RSS版本需要1.6小時。抽取線上5~10個作業,大作業的性能提升相當明顯,不同作業平均下來有30%以上的性能提升,即使是shuffle量不大的作業,由于比較穩定不需要stage重算,長期運行平均時間也會減少10%-20%。
架構靈活性顯著提升,升級為計算與存儲分離架構。Spark在容器中運行的過程中,將RSS作為基礎組件,可以使得Spark容器化能夠大規模的落地,為離線在線統一資源、統一調度打下了基礎。
關于Spark Remote Shuffle Service最佳實踐的示例分析就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。