亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Yarn中如何實現ScheduleBackend

發布時間:2021-06-26 14:07:41 來源:億速云 閱讀:346 作者:Leah 欄目:云計算

這篇文章將為大家詳細講解有關Yarn中如何實現ScheduleBackend,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

Yarn方式下的ScheduleBackend是用的啥?

在SparkContext中創建ScheduleBackend時,會根據指定的”master“參數的前綴決定創建哪種ScheduleBackend,對于"yarn://host:port"這樣的URL來說,如果是cluster模式,就是創建YarnClusterSchedulerBackend,如果是client模式,就是創建YarnClientSchedulerBackend。

我們還是先看看YarnClusterSchedulerBackend的代碼結構把。

YarnClusterSchedulerBackend繼承了YarnSchedulerBackend,沒有太多的發揮代碼,我們直接看YarnSchedulerBackend把。估計client模式下也差不多。

YarnSchedulerBackend又繼承了CoarseGrainedSchedulerBackend,我們看看不同點在哪里。

覆寫了doRequestTotalExecutors和doKillExecutors方法,一個申請Executor,一個殺死Executor。

override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
    yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal))
  }  
  override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
    yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds))
  }

yarnSchedulerEndpointRef就是同一個文件里的endpoint端,看看具體的執行代碼是什么:

      case r: RequestExecutors =>
        amEndpoint match {
          case Some(am) =>
            am.ask[Boolean](r).andThen {
              case Success(b) => context.reply(b)
              case Failure(NonFatal(e)) =>
                logError(s"Sending $r to AM was unsuccessful", e)
                context.sendFailure(e)
            }(ThreadUtils.sameThread)         
        }
      case k: KillExecutors =>
        amEndpoint match {
          case Some(am) =>
            am.ask[Boolean](k).andThen {
              case Success(b) => context.reply(b)
              case Failure(NonFatal(e)) =>
                logError(s"Sending $k to AM was unsuccessful", e)
                context.sendFailure(e)
            }(ThreadUtils.sameThread)          
        }

我們看到它又將消息轉給了amEndpoint,就是轉給了yarn工程里的ApplicationManager。又要跳到ApplicationManager去看看里面的實現邏輯了,真是一波三折啊。

ApplicationManager里是怎么處理RequestExecutors和KillExecutors兩個消息的呢?

      case r: RequestExecutors =>
        Option(allocator) match {
          case Some(a) =>
            if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,
              r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) {
              resetAllocatorInterval()
            }
            context.reply(true)
        }
      case KillExecutors(executorIds) =>
        Option(allocator) match {
          case Some(a) => executorIds.foreach(a.killExecutor)
        }
        context.reply(true)

調用allocator的killExecutor和requestTotalExecutorsWithPreferredLocalities方法。allocator又是啥?這里是不是類有的太多了啊。。

allocator = client.createAllocator(
      yarnConf,
      _sparkConf,
      appAttemptId,
      driverUrl,
      driverRef,
      securityMgr,
      localResources)

是client的createAllocator方法創建出來的,client是啥?是YarnRMClient,我們就要先看看YarnRMClient了,看名字就大概能猜到,YarnRMClient就是來向Yarn機器申請Executor和殺死Executor的。

createAllocator方法返回下面的YarnAllocator:

 return new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, appAttemptId, securityMgr,
      localResources, SparkRackResolver.get(conf))

來到YarnAllocator。

YarnAllocator的killExecutor方法很好理解,就是釋放Yarn中的Container:

 def killExecutor(executorId: String): Unit = synchronized {
    executorIdToContainer.get(executorId) match {
      case Some(container) if !releasedContainers.contains(container.getId) =>
        internalReleaseContainer(container)
        runningExecutors.remove(executorId)
      case _ => logWarning(s"Attempted to kill unknown executor $executorId!")
    }
  }

申請Executor其實最終是在runAllocatedContainers方法中實現的。

核心代碼看一下把,完整的可以看源碼:

    if (runningExecutors.size() < targetNumExecutors) {
        numExecutorsStarting.incrementAndGet()
        if (launchContainers) {
          launcherPool.execute(() => {
            try {
              new ExecutorRunnable(
                Some(container),
                conf,
                sparkConf,
                driverUrl,
                executorId,
                executorHostname,
                executorMemory,
                executorCores,
                appAttemptId.getApplicationId.toString,
                securityMgr,
                localResources
              ).run()
              updateInternalState()
            } catch {              
            }
          })
        }

申請targetNumExecutors個ExecutorRunner,這樣就和Standalone的申請Executor對應起來了。好了,整個過程就是這樣了。

最終就會在Yarn集群中申請了所需數目的Container,并且在Container中啟動ExecutorRunner,來向Driver匯報成績。

這里的ExecutorRunner就是YarnCoarseGrainedExecutorBackend線程,在ExecutorRunner類中可以看到。

關于Yarn中如何實現ScheduleBackend就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

繁峙县| 凯里市| 多伦县| 绥中县| 招远市| 鹤山市| 漳州市| 蓝山县| 本溪| 隆德县| 日土县| 榆社县| 天柱县| 海门市| 元朗区| 郓城县| 葵青区| 慈溪市| 资溪县| 宜宾县| 九龙城区| 拜泉县| 黄梅县| 班戈县| 云林县| 家居| 柳河县| 肇源县| 明光市| 南平市| 大同市| 郎溪县| 磐安县| 阿克| 汉阴县| 凤冈县| 樟树市| 临洮县| 滨州市| 静安区| 龙里县|