亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

SPARK的MAster資源調度原理(源碼)分析

發布時間:2020-08-04 05:52:33 來源:網絡 閱讀:724 作者:惡魔蘇醒ing 欄目:大數據

SPARK的MAster資源分配算法(SPARK1.3)

master資調度通過源碼中的 org.apache.spark.deploy.master包下的schedule()方法實現

步驟如下:

  1. 首先判斷master是否是alive狀態,如果不是alive則返回,也就是只有活動的master才會進行資源調度,standby master是不會進行資源調度的

  2. 把之前注冊的worker中的alive狀態的worker傳入 Random.shuffer方法,該方法主要是把worker順序打亂,返回一個數組

  3. 獲取返回的worker的數量

  4. 用for循環進行driver調度,只有啟用yarn-cluster模式提交application才會進行driver調度,因為yarn-client和 standalone模式都是在提交的客戶端啟動driver,不需要調度

  5. for循環遍歷WaittingDrivers ArrayBuffer,里面用while循環判斷如果有alive的worker沒有遍歷,并且driver為為啟動狀態,則繼續遍歷

  6. 如果這個worker的內存>=driver需要的內存并且CPU>=driver需要的CPU,則啟動driver,將driver從WaittingDrivers隊列中移除

  7. 啟動driver的方法為launchDriver,將driver加入worker的內部緩存,將worker剩余的內存、CPU減去driver需要的內存、CPU,worker也被加入到driver緩存結構中,然后調用worker的actor方法,給worker發送LaunchDriver消息,讓它把driver啟動起來,然后將driver狀態改為RUNNING

  8. driver啟動后,進行application的調度,這里有兩個算法,spreadOutApps和非spreadOutApps算法,這個在代碼的SparkConf里可以設置, ("spark.deploy.spreadOut", true),默認是為true,啟用spreadoutApps

  9. for遍歷WaittingApps中的application,并且用if守衛過濾出還需要進行CPU分配的application,for循環里再次過濾狀態為alive并且可以被application使用的worker,然后按照其剩余的CPU數量倒序排序(可以被application使用的worker必須是可用內存大于等于application最小executor需要的需要的內存,并且沒有被application啟用過)

  10. 把需要分配的application數量放入一個數組,然后獲取最終需要分配的CPU數量=application需要分配的CPU和worker總CPU的最小值

  11. while遍歷worker,如果worker還有可分配的CPU,將總的需要分配的CPU-1,給這個worker分配的CPU+1,指針移到下一個CPU。循環一直到CPU分配完,這種分配算法的結果是application的CPU盡可能的平均分配到了各個worker上,應用程序盡可能多的運行在所有的Node上

  12. 給worker分配完CPU后,遍歷分配到CPU的worker,在每個application內部緩存結構中,添加executor,創建executorDSC對象,其中封裝了給這個executor分配多少 CPU core,然后在worker上啟動executor,將application狀態改為RUNNING

  13. 如果是非spreadOutApps算法,剛好相反,先把每個worker的CPU全部分配完,在分配下一個worker的CPU,

    以下是核心源碼:


  private def schedule() {

    /*

     * 首先判斷 master是否是alive狀態

     */

    if (state != RecoveryState.ALIVE) { return }


    // First schedule drivers, they take strict precedence over applications

    // Randomization helps balance drivers

    //把alive狀態的worker(之前注冊的)傳入Random.shuffle方法,把worker隨機打亂

    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))

    //獲取當前可用worker的數量(隨機打亂后)

    val numWorkersAlive = shuffledAliveWorkers.size

    var curPos = 0

    /*

     * driver的調度機制,遍歷waitingDrivers這個ArrayBuffer

     * 只有用 yarn-cluster模式提交的時候,才會注冊driver,并導致driver被調度,因為standalone和yarn-client模式

     * 都會在本地啟動driver,而不會注冊,更不會調度

     */ 

    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers

      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we

      // start from the last worker that was assigned a driver, and continue onwards until we have

      // explored all alive workers.

      var launched = false

      var numWorkersVisited = 0

      //當還有alive的worker沒有遍歷,并且driver沒有啟動,則繼續遍歷worker

      while (numWorkersVisited < numWorkersAlive && !launched) {

        val worker = shuffledAliveWorkers(curPos)

        numWorkersVisited += 1

        //如果這個worker空閑內存>=driver需要的內存并且worker的空閑CPU>=driver需要的CPU

        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {

          //啟動driver

          launchDriver(worker, driver)

          //并且經driver從waitingDrivers隊列中移除

          waitingDrivers -= driver

          launched = true

        }

        //指針指向下一個worker

        curPos = (curPos + 1) % numWorkersAlive

      }

    }


    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app

    // in the queue, then the second app, etc.

    /*

     * application的調度機制

     * 這里兩種算法,可以在sparkconf設置,默認為true(spreadOutApps算法)

     */

    if (spreadOutApps) {

      // Try to spread out each app among all the nodes, until it has all its cores

      //遍歷waitingApps 中的application,并且用if守衛過濾出還需要進行CPU分配的application

      for (app <- waitingApps if app.coresLeft > 0) {

        //再次過濾狀態為alive并且可以被application使用的worker,然后按照其剩余的CPU數量倒序排序

        //可以被application使用的worker必須是可用內存大于application最小Executor需要的內存,并且沒有被該application啟用過

        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)

          .filter(canUse(app, _)).sortBy(_.coresFree).reverse

          //創建一個數組,存儲需要分配的CPU數量

        val numUsable = usableWorkers.length

        val assigned = new Array[Int](numUsable) // Number of cores to give on each node

        //獲取到底需要分配多少CPU,取application需要分配的CPU和worker總共CPU數量的最小值

        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

        var pos = 0

        while (toAssign > 0) { }

        // Now that we've decided how many cores to give on each node, let's actually give them

        //給worker分配完CPU后,遍歷worker

        for (pos <- 0 until numUsable) {

          //只要worker分配到了CPU

          if (assigned(pos) > 0) {

            //首先在每個application內部緩存結構中,添加executor,

            //創建executorDSC對象,其中封裝了給這個executor分配多少 CPU core

            val exec = app.addExecutor(usableWorkers(pos), assigned(pos))

            //那么就在worker上啟動Executor

            launchExecutor(usableWorkers(pos), exec)

            //將application狀態設置為RUNNING

            app.state = ApplicationState.RUNNING

          }

        }

      }

    } else {

      // Pack each app into as few nodes as possible until we've assigned all its cores

      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {

        for (app <- waitingApps if app.coresLeft > 0) {

          if (canUse(app, worker)) {

            val coresToUse = math.min(worker.coresFree, app.coresLeft)

            if (coresToUse > 0) {

              val exec = app.addExecutor(worker, coresToUse)

              launchExecutor(worker, exec)

              app.state = ApplicationState.RUNNING

            }

          }

        }

      }

    }

  }


  //executor的啟動

def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {

    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)

    //將executor加入worker內部緩存

    worker.addExecutor(exec)

    //向worker發送LaunchExecutor消息

    worker.actor ! LaunchExecutor(masterUrl,

      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)

      //向executor對應的application發送ExecutorAdded消息

    exec.application.driver ! ExecutorAdded(

      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)

  }


//Driver的啟動

  def launchDriver(worker: WorkerInfo, driver: DriverInfo) {

    logInfo("Launching driver " + driver.id + " on worker " + worker.id)

    //將driver加入到worker的內部緩存結構

    //將worker剩余的內存、CPU減去driver使用的內存和CPU

    worker.addDriver(driver)

    //worker也被加入到driver內部緩存結構中

    driver.worker = Some(worker)

    //然后調用worker的actor方法,給worker發送LaunchDriver消息,讓它把driver啟動起來

    worker.actor ! LaunchDriver(driver.id, driver.desc)

    //將driver狀態改為RUNNING

    driver.state = DriverState.RUNNING

  }






向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

兴隆县| 扶风县| 全南县| 中山市| 依兰县| 班玛县| 昌图县| 连平县| 和平县| 岐山县| 邻水| 凌海市| 河北省| 历史| 治县。| 赤峰市| 宿迁市| 崇信县| 宜黄县| 泾阳县| 肥乡县| 闽清县| 建湖县| 孟津县| 会东县| 玛沁县| 三河市| 台中县| 大埔区| 景泰县| 陵川县| 吉木萨尔县| 丹东市| 云安县| 临朐县| 龙泉市| 扎兰屯市| 岑巩县| 巢湖市| 延庆县| 云浮市|