亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spark(二):spark架構及物理執行圖

發布時間:2020-08-05 12:00:29 來源:網絡 閱讀:851 作者:afeiye 欄目:大數據

spark(二):spark架構及物理執行圖
上圖是一個job的提交流程圖,job提交的具體步驟如下

  1. 一旦有action,就會觸發DagScheduler.runJob來提交任務,主要是先生成邏輯執行圖DAG,然后調用 finalStage = newStage() 來劃分 stage。
  2. new Stage() 的時候會調用 finalRDD 的 getParentStages();
  3. getParentStages() 從 finalRDD 出發,反向 visit 邏輯執行圖,遇到 NarrowDependency 就將依賴的 RDD 加入到 stage,遇到 ShuffleDependency 切開 stage,并遞歸到 ShuffleDepedency 依賴的 stage。
  4. 一個 ShuffleMapStage(不是最后形成 result 的 stage)形成后,會將該 stage 最后一個 RDD 注冊到MapOutputTrackerMaster.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size),這一步很重要,因為 shuffle 過程需要 MapOutputTrackerMaster 來指示 ShuffleMapTask 輸出數據的位置。
  5. 之后是submitStage(finalStage)
  6. 先確定該 stage 的 missingParentStages,使用getMissingParentStages(stage)。如果 parentStages 都可能已經執行過了,那么就為空了。
  7. 如果 missingParentStages 不為空,那么先遞歸提交 missing 的 parent stages,并將自己加入到 waitingStages 里面,等到 parent stages 執行結束后,會觸發提交 waitingStages 里面的 stage。
  8. 如果 missingParentStages 為空,說明該 stage 可以立即執行,那么就調用submitMissingTasks(stage, jobId)來生成和提交具體的 task。如果 stage 是 ShuffleMapStage,那么 new 出來與該 stage 最后一個 RDD 的 partition 數相同的 ShuffleMapTasks。如果 stage 是 ResultStage,那么 new 出來與 stage 最后一個 RDD 的 partition 個數相同的 ResultTasks。一個 stage 里面的 task 組成一個 TaskSet,最后調用taskScheduler.submitTasks(taskSet)來提交一整個 taskSet。
  9. taskScheduler會把task發給DriverActor進程,DriverActor序列話之后發給exector真正執行。

spark(二):spark架構及物理執行圖
上圖是task執行流程,具體執行過程如下

  1. Worker 端接收到 tasks 后,executor 將 task 包裝成 taskRunner,并從線程池中抽取出一個空閑線程運行 task。
  2. Executor 收到 serialized 的 task 后,先 deserialize 出正常的 task,然后運行 task 得到其執行結果 directResult,這個結果要送回到 driver 那里。
  3. 如果 result 比較大(比如 groupByKey 的 result)先把 result 存放到本地的“內存+磁盤”上,由 blockManager 來管理,只把存儲位置信息(indirectResult)發送給 driver。
  4. ShuffleMapTask 生成的是 MapStatus,MapStatus 包含兩項內容:一是該 task 所在的 BlockManager 的 BlockManagerId(實際是 executorId + host, port, nettyPort),二是 task 輸出的每個 FileSegment 大小。
  5. ResultTask 生成的 result 的是 func 在 partition 上的執行結果。**比如 count() 的 func 就是統計 partition 中 records 的個數。
  6. Driver 收到 task 的執行結果 result 后會進行一系列的操作:
  7. a,首先告訴 taskScheduler 這個 task 已經執行完,然后去分析 result。
  8. b,如果是 ResultTask 的 result,那么可以使用 ResultHandler 對 result 進行 driver 端的計算(比如 count() 會對所有 ResultTask 的 result 作 sum)
  9. c,如果 result 是 ShuffleMapTask 的 MapStatus,那么需要將 MapStatus(ShuffleMapTask 輸出的 FileSegment 的位置和大小信息)存放到 mapOutputTrackerMaster 中的 mapStatuses 數據結構中以便以后 reducer shuffle 的時候查詢
  10. d,如果 driver 收到的 task 是該 stage 中的最后一個 task,那么可以 submit 下一個 stage,如果該 stage 已經是最后一個 stage,那么告訴 dagScheduler job 已經完成
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

高平市| 衢州市| 通辽市| 全州县| 江孜县| 鹿邑县| 淮安市| 巴青县| 海宁市| 和政县| 逊克县| 定结县| 湾仔区| 中牟县| 桑植县| 康定县| 宁城县| 常熟市| 霞浦县| 青岛市| 忻城县| 水富县| 德阳市| 宜州市| 普安县| 昌宁县| 宜川县| 汉源县| 宜宾县| 和平县| 天峨县| 福州市| 皋兰县| 夏河县| 孝昌县| 江都市| 黔南| 新蔡县| 龙胜| 阳山县| 巴青县|