亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行Apache Spark源碼分析Job的提交與運行

發布時間:2021-12-16 21:58:01 來源:億速云 閱讀:131 作者:柒染 欄目:云計算

如何進行Apache Spark源碼分析Job的提交與運行 ,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

下面以wordCount為例,詳細說明spark創建和運行job的過程,重點是在進程及線程的創建。

實驗環境搭建

在進行后續操作前,確保下列條件已滿足。

1. 下載spark binary 0.9.1

2. 安裝scala

3. 安裝sbt

4. 安裝java

啟動spark-shell單機模式運行,即local模式

local模式運行非常簡單,只要運行以下命令即可,假設當前目錄是$SPARK_HOME

MASTER=local bin/spark-shell

"MASTER=local"就是表明當前運行在單機模式

local cluster方式運行

localcluster模式是一種偽cluster模式,在單機環境下模擬standalone的集群,啟動順序分別如下

1. 啟動master

2. 啟動worker

3. 啟動spark-shell

master$SPARK_HOME/sbin/start-master.sh

注意運行時的輸出,日志默認保存在$SPARK_HOME/logs目錄。

master主要是運行類 org.apache.spark.deploy.master.Master在8080端口啟動監聽,日志如下圖所示如何進行Apache Spark源碼分析Job的提交與運行

修改配置

1. 進入$SPARK_HOME/conf目錄

2. 將spark-env.sh.template重命名為spark-env.sh

3. 修改spark-env.sh,添加如下內容

export SPARK_MASTER_IP=localhostexport SPARK_LOCAL_IP=localhost運行workerbin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077 -i 127.0.0.1  -c 1 -m 512M

worker啟動完成,連接到master。打開maser的webui可以看到連接上來的worker. Master WEb UI的監聽地址是http://localhost:8080

啟動spark-shellMASTER=spark://localhost:7077 bin/spark-shell

如果一切順利,將看到下面的提示信息。

Created spark context..Spark context available as sc.

可以用瀏覽器打開localhost:4040來查看如下內容

1. stages

2. storage

3. environment

4. executors

wordcount

上述環境準備妥當之后,我們在sparkshell中運行一下最簡單的例子,在spark-shell中輸入如下代碼

scala>sc.textFile("README.md").filter(_.contains("Spark")).count

上述代碼統計在README.md中含有Spark的行數有多少

部署過程詳解

Spark布置環境中組件構成如下圖所示。

如何進行Apache Spark源碼分析Job的提交與運行 



  • Driver Program 簡要來說在spark-shell中輸入的wordcount語句對應于上圖的Driver Program.

  • Cluster Manager 就是對應于上面提到的master,主要起到deploy management的作用

  • Worker Node 與Master相比,這是slave node。上面運行各個executor,executor可以對應于線程。executor處理兩種基本的業務邏輯,一種就是driver     programme,另一種就是job在提交之后拆分成各個stage,每個stage可以運行一到多個task

Notes: 在集群(cluster)方式下, Cluster Manager運行在一個jvm進程之中,而worker運行在另一個jvm進程中。在local cluster中,這些jvm進程都在同一臺機器中,如果是真正的standalone或Mesos及Yarn集群,worker與master或分布于不同的主機之上。

JOB的生成和運行

job生成的簡單流程如下

1. 首先應用程序創建SparkContext的實例,如實例為sc

2. 利用SparkContext的實例來創建生成RDD

3. 經過一連串的transformation操作,原始的RDD轉換成為其它類型的RDD

4. 當action作用于轉換之后RDD時,會調用SparkContext的runJob方法

5. sc.runJob的調用是后面一連串反應的起點,關鍵性的躍變就發生在此處

調用路徑大致如下

1. sc.runJob->dagScheduler.runJob->submitJob

2. DAGScheduler::submitJob會創建JobSummitted的event發送給內嵌類eventProcessActor

3. eventProcessActor在接收到JobSubmmitted之后調用processEvent處理函數

4. job到stage的轉換,生成finalStage并提交運行,關鍵是調用submitStage

5. 在submitStage中會計算stage之間的依賴關系,依賴關系分為寬依賴窄依賴兩種

6. 如果計算中發現當前的stage沒有任何依賴或者所有的依賴都已經準備完畢,則提交task

7. 提交task是調用函數submitMissingTasks來完成

8. task真正運行在哪個worker上面是由TaskScheduler來管理,也就是上面的submitMissingTasks會調用TaskScheduler::submitTasks

9. TaskSchedulerImpl中會根據Spark的當前運行模式來創建相應的backend,如果是在單機運行則創建LocalBackend

10. LocalBackend收到TaskSchedulerImpl傳遞進來的ReceiveOffers事件

11. receiveOffers->executor.launchTask->TaskRunner.run

代碼片段executor.lauchTask

def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {    val tr = new TaskRunner(context, taskId, serializedTask)    runningTasks.put(taskId, tr)    threadPool.execute(tr)  }

說了這么一大通,也就是講最終的邏輯處理切切實實是發生在TaskRunner這么一個executor之內。

運算結果是包裝成為MapStatus然后通過一系列的內部消息傳遞,反饋到DAGScheduler,這一個消息傳遞路徑不是過于復雜。

看完上述內容,你們掌握如何進行Apache Spark源碼分析Job的提交與運行 的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

宁夏| 临江市| 鹰潭市| 宜州市| 宽甸| 依兰县| 湘西| 象州县| 井冈山市| 吐鲁番市| 逊克县| 西和县| 邵阳县| 根河市| 沾益县| 额尔古纳市| 深泽县| 鄄城县| 玛多县| 兴隆县| 永福县| 格尔木市| 洪湖市| 海口市| 道真| 定边县| 宁南县| 曲水县| 汉寿县| 都昌县| 新民市| 芒康县| 奉化市| 孟村| 漳州市| 涪陵区| 翁源县| 怀远县| 新宾| 磐石市| 织金县|