您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關怎么分析spark計算框架,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
首先明確一點:學計算框架主要就是學2部分:1.資源調度 2.任務調度
寫一個spark程序包含加載配置文件,創建上下文,創建RDD , 調用RDD的算子,用戶在算子中自定義的函數
map端:狹窄的理解是MapReduce中的map端,本質就是將數據變成你想要的形式,例如:按照空格切分,乘2等等操作。
shuffle : 分為shuffle write(臨時存到本地磁盤)和shuffle read(從磁盤拉數據,同一個分區的拉到一個partition上)階段,本質就是數據的規整,例如同一個分區的拉到一塊。
reduce端:狹窄的理解是MapReduce中的reduce端,本質就是數據的聚合
寬泛的理解2個stage之間,前面的可以說是map端,后面的stage可以理解為reduce端,中間正好需要shuffle過程,且shuffle過程需要再shuffle write階段將數據暫時存到本地磁盤上。
spark專業術語:
任務相關的專業術語:
1.application:用戶寫的應用程序(包含2部分:Driver Program(運行應用的main()方法,創建spark上下文 )和Executor Program(用戶在算子中自定義的函數))
2.job:一個action類算子觸發執行的操作,有多少個action類算子就有多少個job,一個應用程序可以有多個job.
3.stage(階段):一組任務(task)就是一個stage,例如MapReduce中一組的map task(一個切片對應一個map task),一個job中可以有有多個stage(根據寬依賴為分界線來劃分的)
.4.task(任務:底層就是一個thread(線程)):在集群運行時最小的執行單元
集群相關的專業術語:
Master:資源管理的主節點
Worker:資源管理的從節點
Executor:執行任務的進程,運行在worker節點上,負責運行task,負責將數據存儲到內存或磁盤,每個application有多個獨立的Executors
ThreadPool:線程池,存在與Executor進程中,task在線程池中運行
RDD的依賴關系
RDD有5大特性:
1.一個RDD有多個partition組成。
2.每個算子實質上作用于每個partition上。
3.每個RDD依賴其父RDD.
4.可選項 :分區器是作用于KV格式的RDD上
5.可選項:RDD會提供一系列的最佳的計算位置
父RDD不知道其子RDD,但是子RDD知道的的所有父RDD
1.窄依賴:父RDD與子RDD,partition的關系是一對一,這種情況并沒有shuffle過程
例如:map(x=>x.split(" "))
2.寬依賴 : 父RDD與子RDD,partition之間的關系是一對多,這種情況下一般都會導致shuffle數據規整的過程
例如:groupByKey()->相同key的二元組一定在同一個分區中,無參的情況下子RDD的分區數等于父RDD的分區數(也就是會先計算key的hash函數再與父RDD的分區數求余,所以最終的數據一定會散落在這幾個分區中),當然你可以傳入參數,這個參數用于鎖定該子RDD有多少個分區,后面調優的時候會用到。
groupBy:根據指定的作為分組依據,同sortBy和sortByKey
寬窄依賴的作用是:將job切割成多個stage.從祖先RDD開始找,如果是窄依賴繼續往下找,以寬依賴為切割點,分為2個stage
那么為什么要劃分出stage呢?因為每個stage中的RDD都是窄依賴,沒有shuffle過程,且每個partition都是一對一的關系,所以可以在后面以管道的形式使每個partition上的task并行處理 (簡單說就是為了是每個task以管道的形式進行計算)
關于stage的一個結論:stage與stage之間是寬依賴,stage內部都是窄依賴
形成一個DAG(有向無環圖)需要從最后一個RDD往前回溯:因為子RDD知道父RDD,但是父RDD不知道子RDD
RDD中不是存儲的真實數據,而是存儲的對數據處理的邏輯過程
對于KV格式的RDD應該說:存儲的邏輯過程的返回類型是二元組類型我們稱為是KV格式的RDD
每個task作用于partition所在的block或副本所在的節點上(計算向數據移動,本地化可以大大減少網絡傳輸),這里task的計算邏輯(也就是這個展開式),處理的結果并沒有落地(存到磁盤的意思),而是以管道的模式,一條一條數據的從partition(邏輯上的,數據存在block上)中讀到內存,在內存中一直連續的執行,直到最后執行完這個task才可能會落地,一條接著一條的流式處理,一個task中的數據像流水線一樣,多個task是并行計算的。
偽代碼中的輸出:一條filter的輸出,一條map的輸出,交替出現,而不是先將filter中的所有數據都打印出來,再打印map的數據。
從這里就能明顯感覺到spark計算框架比MapReduce計算框架的優勢:基于內存迭代,不需要落地,不需要存儲到磁盤,減少了磁盤IO,大大提高了效率。
幾個問題:
1.stage中的task(管道模式)并行計算,什么時候會落地磁盤呢?
①如果是stage后面是action類算子
collect:將每個管道中的輸出結果收集到driver端的內存中
saveAsTextFile:將每個管道中的輸出結果保存到指定目錄,可以是本地磁盤,也可以是hdfs中
count:將管道的計算結果統計記錄數,返回給Driver
②如果是stage后面是stage
在shuffle write節點會寫到本地磁盤暫時存儲,因為內存中的數據不夠穩定,為了防止reduce task拉取數據失敗
2.spark在計算過程中,是不是非常消耗內存?
不是,正常使用,因為管道是很細的不會導致內存過大,多個task并行運算,也是正常使用,但是如果使用控制類算子的 cache,就會消耗大量內存,因為如果一個rdd調用cache(),會將這個管道,開一個口,將數據復制一份放到內存中存儲,方便下次運行,但是非常消耗內存。
3.RDD彈性分布式數據集,為什么不存儲數據,還依然叫數據集?
因為它有處理數據的能力,可以通過生活的例子來舉例說明:例如:滴滴雖然每年一直虧損,但是市值依然很高,因為他雖然沒錢,但有創造錢的能力
對比一下spark和MapReduce的計算模式的差異:
mapreduce是1+1=2 2+1=3
spark是1+1+1=3
spark的任務調度過程:
1.首先編寫一個Application(上面的這個程序缺少一個action算子),一個spark應用程序是基于RDD來操作的,會先創建出相應的RDD對象,然后建立一個系統DAG(有向無環圖)
2.DAGScheduler(有向無環圖調度器)分割這個DAG,將其分割成多個stage,每個stage中有一組的task,所以也叫TaskSet(任務集合),一個stage就是一個TaskSet
3.將TaskSet提交給TaskScheduler(任務調度器),經由集群管理者發送任務到worker節點運行,監控task,會重試失敗的task和掉隊的task,不可能無限重試,所以限制重試次數為3次,默認最大失敗次數為4次,如果重試了3次還是失敗,此時TaskScheduler會向DAGScheduler匯報當前失敗的task所在的stage失敗,此時DAGScheduler收到匯報也會重試該stage,重試次數默認為4次,注意此時已經成功執行的task不需要再重新執行了,只需要提交失敗的task就行,如果stage重試4次失敗,說明這個job就徹底失敗了,job沒有重試。
那么問題是發送到哪個work節點呢?最好是存儲節點(HDFS)包含計算節點(這里是spark集群),因為這樣為了數據本地化。根據文件名就可以獲得該文件的所有信息,根據文件名可以獲得每一個block的位置,以及block所在節點的ip等,然后就將task發送到該節點運行就行。
4.task放到work節點的executor進程中的線程池中運行
spark資源調度的方式
粗粒度的資源調度
在任務執行前申請到所需的所有資源,當所有 task 執行完畢后再釋放資源
優點:task 直接使用已經申請好的資源,執行效率高
缺點:所有的 task 執行完畢才釋放資源,可能導致集群資源浪費,例如只剩一個 task 遲遲不能結束,那么大量資源將被閑置
細粒度的資源調度
任務執行時,task 自己去申請資源,執行完畢后釋放資源
優點:使集群資源得以充分利用
缺點:task 需要自己申請資源,執行效率低
spark on standalone 執行流程
1> worker 節點啟動,向 master 匯報信息,該信息被存儲在 workers 對象中,workers 底層使用 HashSet 數據結構,為了防止同一臺 worker 節點在 master 中注冊兩次(worker 節點掛掉但是迅速恢復可能會導致此問題)
2> 在客戶端提交任務,這里以客戶端提交方式為例,首先客戶端會啟動 driver 進程,然后構建Spark Application的運行環境,創建 SparkContext 對象,這會創建并初始化 TaskScheduler 和 DAGScheduler 兩個對象
3> 當兩個對象創建完成后,TaskScheduler 會向 master 為 Application 申請資源, Application 的信息會注冊在 master 上的 waitingApps 對象中,waitingApps 使用 ArrayBuffer 存儲數據
4> 當 waitingApps 集合中的元素發生變化時會回調 schedule() 方法,這時 master 就知道有 Appliacation 在請求執行。master 會去讀取 workers 來獲取自己掌握的 worker 節點,然后在資源充足的 worker 節點上為 Appliacation 分配資源 -> 通知 worker 節點啟動Executor 進程,Executor 進程啟動時會在內部初始化一個線程池,用來執行 task
–master 采用輪循方式分配資源,確保整個集群的資源得到充分利用,并有利于后面分發 task 時實現數據本地化–每一個 worker 節點上默認為 Applacation 啟動 1 個 Executor 進程,該 Executor 進程默認使用 1G 內存和該 worker 節點上空閑的所有的核可通過在提交任務時使用 - -executor-cores 和 - -executor-memory 來手動指定每個 Executor 使用的資源–spark 采用粗粒度的資源調度,當所有 task 都執行完畢后,才進行資源回收
5> 當 Executor 成功啟動后,會去向 TaskScheduler 反向注冊,此時 TaskScheduler 就得到所有成功啟動的 Executor 的信息
6> SparkContext 對象解析代碼構建DAG(有向無環圖)交給 DAGScheduler,每一個 job 會構建一個DAG圖,DAGScheduler 根據 DAG 中 RDD 的寬窄依賴將其切分成一個個 stage,每個 stage 中包含一組 task,每個 task 因為都是窄依賴,不會產生 shuffle,所以都是 pipeline(管道) 計算模式
7> DAGScheduler 將一個 stage 封裝到一個 taskSet 中,傳給 TaskScheduler,TaskScheduler拿到后遍歷 taskSet ,得到一個個 task,解讀其要計算的數據,然后調用 HDFS 的 API 得到數據所在的位置
8> 本著計算向數據靠攏的原則,TaskScheduler 將 task 分發到其所要計算的數據所在的節點的 Executor 進程中,task 最后會被封裝到線程池里的一個線程中執行,task 執行的過程中 TaskScheduler 會對其進行監控
9> 如果 task 執行失敗,TaskScheduler 會進行重試,再次分發該 task ,最多重試3次;
如果 task 陷入掙扎并且 spark 開啟了推測執行,TaskScheduler 會換一個節點分發陷入掙扎的 task,兩個 task 誰先執行完就以誰的結果為準
陷入掙扎的判定標準:當75%的 task 已經執行完畢后,這時 TaskScheduler 每隔10ms會計算一次剩余 task 當前執行時間的中值 t,然后以 t 的1.5倍 為標準,未執行完的 task 當前執行時間如果大于 t*1.5 則該 task 被判定為陷入掙扎的 task
10> 如果3次重試后 task 依然執行失敗,該 task 所在的 stage 就會被判定為失敗,TaskScheduler 會向 DAGScheduler 反饋,DAGScheduler 會重試失敗的 stage,最多重試4次,如果4次重試后該 stage 依然失敗,則該 job 被判定為失敗,程序中止
DAGScheduler 重試 stage 時只會重試 stage 中失敗的 task
11> 當所有 task 成功執行完畢后或 job 失敗,driver 會通知 master, master 會通知 worker kill 掉 Executor,完成資源回收
以上就是怎么分析spark計算框架,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。