您好,登錄后才能下訂單哦!
SPARK的MAster資源分配算法(SPARK1.3)
master資調度通過源碼中的 org.apache.spark.deploy.master包下的schedule()方法實現
步驟如下:
首先判斷master是否是alive狀態,如果不是alive則返回,也就是只有活動的master才會進行資源調度,standby master是不會進行資源調度的
把之前注冊的worker中的alive狀態的worker傳入 Random.shuffer方法,該方法主要是把worker順序打亂,返回一個數組
獲取返回的worker的數量
用for循環進行driver調度,只有啟用yarn-cluster模式提交application才會進行driver調度,因為yarn-client和 standalone模式都是在提交的客戶端啟動driver,不需要調度
for循環遍歷WaittingDrivers ArrayBuffer,里面用while循環判斷如果有alive的worker沒有遍歷,并且driver為為啟動狀態,則繼續遍歷
如果這個worker的內存>=driver需要的內存并且CPU>=driver需要的CPU,則啟動driver,將driver從WaittingDrivers隊列中移除
啟動driver的方法為launchDriver,將driver加入worker的內部緩存,將worker剩余的內存、CPU減去driver需要的內存、CPU,worker也被加入到driver緩存結構中,然后調用worker的actor方法,給worker發送LaunchDriver消息,讓它把driver啟動起來,然后將driver狀態改為RUNNING
driver啟動后,進行application的調度,這里有兩個算法,spreadOutApps和非spreadOutApps算法,這個在代碼的SparkConf里可以設置, ("spark.deploy.spreadOut", true),默認是為true,啟用spreadoutApps
for遍歷WaittingApps中的application,并且用if守衛過濾出還需要進行CPU分配的application,for循環里再次過濾狀態為alive并且可以被application使用的worker,然后按照其剩余的CPU數量倒序排序(可以被application使用的worker必須是可用內存大于等于application最小executor需要的需要的內存,并且沒有被application啟用過)
把需要分配的application數量放入一個數組,然后獲取最終需要分配的CPU數量=application需要分配的CPU和worker總CPU的最小值
while遍歷worker,如果worker還有可分配的CPU,將總的需要分配的CPU-1,給這個worker分配的CPU+1,指針移到下一個CPU。循環一直到CPU分配完,這種分配算法的結果是application的CPU盡可能的平均分配到了各個worker上,應用程序盡可能多的運行在所有的Node上
給worker分配完CPU后,遍歷分配到CPU的worker,在每個application內部緩存結構中,添加executor,創建executorDSC對象,其中封裝了給這個executor分配多少 CPU core,然后在worker上啟動executor,將application狀態改為RUNNING
如果是非spreadOutApps算法,剛好相反,先把每個worker的CPU全部分配完,在分配下一個worker的CPU,
以下是核心源碼:
private def schedule() {
/*
* 首先判斷 master是否是alive狀態
*/
if (state != RecoveryState.ALIVE) { return }
// First schedule drivers, they take strict precedence over applications
// Randomization helps balance drivers
//把alive狀態的worker(之前注冊的)傳入Random.shuffle方法,把worker隨機打亂
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
//獲取當前可用worker的數量(隨機打亂后)
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
/*
* driver的調度機制,遍歷waitingDrivers這個ArrayBuffer
* 只有用 yarn-cluster模式提交的時候,才會注冊driver,并導致driver被調度,因為standalone和yarn-client模式
* 都會在本地啟動driver,而不會注冊,更不會調度
*/
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
//當還有alive的worker沒有遍歷,并且driver沒有啟動,則繼續遍歷worker
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
//如果這個worker空閑內存>=driver需要的內存并且worker的空閑CPU>=driver需要的CPU
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
//啟動driver
launchDriver(worker, driver)
//并且經driver從waitingDrivers隊列中移除
waitingDrivers -= driver
launched = true
}
//指針指向下一個worker
curPos = (curPos + 1) % numWorkersAlive
}
}
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
/*
* application的調度機制
* 這里兩種算法,可以在sparkconf設置,默認為true(spreadOutApps算法)
*/
if (spreadOutApps) {
// Try to spread out each app among all the nodes, until it has all its cores
//遍歷waitingApps 中的application,并且用if守衛過濾出還需要進行CPU分配的application
for (app <- waitingApps if app.coresLeft > 0) {
//再次過濾狀態為alive并且可以被application使用的worker,然后按照其剩余的CPU數量倒序排序
//可以被application使用的worker必須是可用內存大于application最小Executor需要的內存,并且沒有被該application啟用過
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
//創建一個數組,存儲需要分配的CPU數量
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
//獲取到底需要分配多少CPU,取application需要分配的CPU和worker總共CPU數量的最小值
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
while (toAssign > 0) { }
// Now that we've decided how many cores to give on each node, let's actually give them
//給worker分配完CPU后,遍歷worker
for (pos <- 0 until numUsable) {
//只要worker分配到了CPU
if (assigned(pos) > 0) {
//首先在每個application內部緩存結構中,添加executor,
//創建executorDSC對象,其中封裝了給這個executor分配多少 CPU core
val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
//那么就在worker上啟動Executor
launchExecutor(usableWorkers(pos), exec)
//將application狀態設置為RUNNING
app.state = ApplicationState.RUNNING
}
}
}
} else {
// Pack each app into as few nodes as possible until we've assigned all its cores
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
if (canUse(app, worker)) {
val coresToUse = math.min(worker.coresFree, app.coresLeft)
if (coresToUse > 0) {
val exec = app.addExecutor(worker, coresToUse)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
}
}
}
}
//executor的啟動
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
//將executor加入worker內部緩存
worker.addExecutor(exec)
//向worker發送LaunchExecutor消息
worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
//向executor對應的application發送ExecutorAdded消息
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
//Driver的啟動
def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
//將driver加入到worker的內部緩存結構
//將worker剩余的內存、CPU減去driver使用的內存和CPU
worker.addDriver(driver)
//worker也被加入到driver內部緩存結構中
driver.worker = Some(worker)
//然后調用worker的actor方法,給worker發送LaunchDriver消息,讓它把driver啟動起來
worker.actor ! LaunchDriver(driver.id, driver.desc)
//將driver狀態改為RUNNING
driver.state = DriverState.RUNNING
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。