您好,登錄后才能下訂單哦!
本篇內容主要講解“Android Dispatchers.IO線程池源碼分析”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Android Dispatchers.IO線程池源碼分析”吧!
在協程中,當需要執行IO任務時,會在上下文中指定Dispatchers.IO來進行線程的切換調度。 而IO實際上是CoroutineDispatcher類型的對象,實際的值為DefaultScheduler類的常量對象IO,代碼如下:
public actual object Dispatchers { ... @JvmStatic public val IO: CoroutineDispatcher = DefaultScheduler.IO }
DefaultScheduler類繼承自ExperimentalCoroutineDispatcher類,內部提供了類型為LimitingDispatcher的IO對象,代碼如下:
// 系統配置變量 public const val IO_PARALLELISM_PROPERTY_NAME: String = "kotlinx.coroutines.io.parallelism" ... // 表示不會阻塞的任務,純CPU任務 internal const val TASK_NON_BLOCKING = 0 // 表示執行過程中可能會阻塞的任務,非純CPU任務 internal const val TASK_PROBABLY_BLOCKING = 1 ... // 默認線程池名稱 internal const val DEFAULT_DISPATCHER_NAME = "Dispatchers.Default" ... internal object DefaultScheduler : ExperimentalCoroutineDispatcher() { // 創建名為Dispatchers.IO的線程池 // 最大并發數量為kotlinx.coroutines.io.parallelism指定的值,默認為64與CPU數量中的較大者 // 默認的執行的任務類型為TASK_PROBABLY_BLOCKING val IO: CoroutineDispatcher = LimitingDispatcher( this, systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)), "Dispatchers.IO", TASK_PROBABLY_BLOCKING ) override fun close() { throw UnsupportedOperationException("$DEFAULT_DISPATCHER_NAME cannot be closed") } // 可以看出IO和Default共用一個線程池 override fun toString(): String = DEFAULT_DISPATCHER_NAME @InternalCoroutinesApi @Suppress("UNUSED") public fun toDebugString(): String = super.toString() }
LimitingDispatcher類繼承自ExecutorCoroutineDispatcher類,實現了TaskContext接口和Executor接口。
LimitingDispatcher類的核心是構造方法中類型為ExperimentalCoroutineDispatcher的dispatcher對象。
LimitingDispatcher類看起來是一個標準的線程池,但實際上LimitingDispatcher類只對類參數中傳入的dispatcher進行包裝和功能擴展。如同名字中的litmit一樣,LimitingDispatcher類主要用于對任務執行數量進行限制,代碼如下:
// dispatcher參數傳入了DefaultScheduler對象 // parallelism表示并發執行的任務數量 // name表示線程池的名字 // taskMode表示任務模式,TaskContext接口中的常量 private class LimitingDispatcher( private val dispatcher: ExperimentalCoroutineDispatcher, private val parallelism: Int, private val name: String?, override val taskMode: Int ) : ExecutorCoroutineDispatcher(), TaskContext, Executor { // 用于保存任務的隊列 private val queue = ConcurrentLinkedQueue<Runnable>() // 用于記錄當前正在執行的任務的數量 private val inFlightTasks = atomic(0) // 獲取當前線程池 override val executor: Executor get() = this // Executor接口的實現,線程池的核心方法,通過dispatch實現 override fun execute(command: Runnable) = dispatch(command, false) override fun close(): Unit = error("Close cannot be invoked on LimitingBlockingDispatcher") // CoroutineDispatcher接口的實現 override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false) // 任務分發的核心方法 private fun dispatch(block: Runnable, tailDispatch: Boolean) { // 獲取當前要執行的任務 var taskToSchedule = block // 死循環 while (true) { // 當前執行的任務數加一,也可理解生成生成當前要執行的任務的編號 val inFlight = inFlightTasks.incrementAndGet() // 如果當前需要執行的任務數小于允許的并發執行任務數量,說明可以執行, if (inFlight <= parallelism) { // 調用參數中的dispatcher對象,執行任務 dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch) // 返回,退出循環 return } // 如果達到的最大并發數的限制,則將任務加入到隊列中 queue.add(taskToSchedule) // 下面的代碼防止線程競爭導致任務卡在隊列里不被執行,case如下: // 線程1:inFlightTasks = 1 ,執行任務 // 線程2:inFlightTasks = 2,當前達到了parallelism限制, // 線程1:執行結束,inFlightTasks = 1 // 線程2:將任務添加到隊列里,執行結束,inFlightTasks = 0 // 由于未執行,因此這里當前執行的任務數先減一 // 減一后如果仍然大于等于在大并發數,則直接返回,退出循環 if (inFlightTasks.decrementAndGet() >= parallelism) { return } // 如果減一后,發現可以執行任務,則從隊首獲取任務,進行下一次循環 // 如果隊列為空,說明沒有任務,則返回,退出循環 taskToSchedule = queue.poll() ?: return } } // CoroutineDispatcher接口的實現,用于yield掛起協程時的調度處理 override fun dispatchYield(context: CoroutineContext, block: Runnable) { // 也是通過dispatch方法實現,注意這里tailDispatch參數為true dispatch(block, tailDispatch = true) } override fun toString(): String { return name ?: "${super.toString()}[dispatcher = $dispatcher]" } // TaskContext接口的實現,用于在一個任務執行完進行回調 override fun afterTask() { // 從隊首獲取一個任務 var next = queue.poll() // 若可以獲取到 if (next != null) { // 則執行任務,注意這里tailDispatch參數為true dispatcher.dispatchWithContext(next, this, true) // 返回 return } // 任務執行完畢,當前執行的任務數量減一 inFlightTasks.decrementAndGet() // 下面的代碼防止線程競爭導致任務卡在隊列里不被執行,case如下: // 線程1:inFlightTasks = 1 ,執行任務 // 線程2:inFlightTasks = 2 // 線程1:執行結束,執行afterTask方法,發現隊列為空,此時inFlightTasks = 2 // 線程2:inFlightTasks當前達到了parallelism限制, // 將任務加入到隊列中,執行結束,inFlightTasks = 1 // 線程1:inFlightTasks=1,執行結束 // 從隊列中取出任務,隊列為空則返回 next = queue.poll() ?: return // 執行任務,注意這里tailDispatch參數為true dispatch(next, true) } }
dispatcher的dispatch方法定義在ExperimentalCoroutineDispatcher類中。
ExperimentalCoroutineDispatcher類繼承自ExecutorCoroutineDispatcher類,代碼如下:
// corePoolSize線程池核心線程數 // maxPoolSize表示線程池最大線程數 // schedulerName表示內部協程調度器的名字 // idleWorkerKeepAliveNs表示空閑的線程存活時間 @InternalCoroutinesApi public open class ExperimentalCoroutineDispatcher( private val corePoolSize: Int, private val maxPoolSize: Int, private val idleWorkerKeepAliveNs: Long, private val schedulerName: String = "CoroutineScheduler" ) : ExecutorCoroutineDispatcher() { // 我們在DefaultScheduler類中就是通過默認的構造方法, // 創建的父類ExperimentalCoroutineDispatcher對象 public constructor( corePoolSize: Int = CORE_POOL_SIZE, maxPoolSize: Int = MAX_POOL_SIZE, schedulerName: String = DEFAULT_SCHEDULER_NAME ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName) ... // 創建coroutineScheduler對象 private var coroutineScheduler = createScheduler() // 核心的分發方法 override fun dispatch(context: CoroutineContext, block: Runnable): Unit = try { // 調用coroutineScheduler對象的dispatch方法 coroutineScheduler.dispatch(block) } catch (e: RejectedExecutionException) { // 只有當coroutineScheduler正在關閉時,才會拒絕執行,拋出異常 DefaultExecutor.dispatch(context, block) } ... private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName) ... } // 核心線程數 @JvmField internal val CORE_POOL_SIZE = systemProp( "kotlinx.coroutines.scheduler.core.pool.size", AVAILABLE_PROCESSORS.coerceAtLeast(2), // !!! at least two here minValue = CoroutineScheduler.MIN_SUPPORTED_POOL_SIZE ) // 最大線程數 @JvmField internal val MAX_POOL_SIZE = systemProp( "kotlinx.coroutines.scheduler.max.pool.size", (AVAILABLE_PROCESSORS * 128).coerceIn( CORE_POOL_SIZE, CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE ), maxValue = CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE ) // 空閑線程的存活時間 @JvmField internal val IDLE_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos( systemProp("kotlinx.coroutines.scheduler.keep.alive.sec", 60L) )
在ExperimentalCoroutineDispatcher類的dispatch方法內部,通過調用類型為CoroutineScheduler的對象的dispatch方法實現。
在對CoroutineScheduler類的dispatch方法分析之前,首先分析一下CoroutineScheduler類的繼承關系,代碼如下:
// 實現了Executor和Closeable接口 // corePoolSize線程池核心線程數 // maxPoolSize表示線程池最大線程數 // schedulerName表示內部協程調度器的名字 // idleWorkerKeepAliveNs表示空閑的線程存活時間 internal class CoroutineScheduler( @JvmField val corePoolSize: Int, @JvmField val maxPoolSize: Int, @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME ) : Executor, Closeable { init { // 核心線程數量必須大于等于MIN_SUPPORTED_POOL_SIZE require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE) { "Core pool size $corePoolSize should be at least $MIN_SUPPORTED_POOL_SIZE" } // 最大線程數量必須大于等于核心線程數量 require(maxPoolSize >= corePoolSize) { "Max pool size $maxPoolSize should be greater than or equals to core pool size $corePoolSize" } // 最大線程數量必須小于等于MAX_SUPPORTED_POOL_SIZE require(maxPoolSize <= MAX_SUPPORTED_POOL_SIZE) { "Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE" } // 空閑的線程存活時間必須大于0 require(idleWorkerKeepAliveNs > 0) { "Idle worker keep alive time $idleWorkerKeepAliveNs must be positive" } } ... // Executor接口中的實現,通過dispatch方法實現 override fun execute(command: Runnable) = dispatch(command) // Closeable接口中的實現,通過shutdown方法實現 override fun close() = shutdown(10_000L) ... }
接下來對CoroutineScheduler類中重要的全局變量進行分析,代碼如下:
// 用于存儲全局的純CPU(不阻塞)任務 @JvmField val globalCpuQueue = GlobalQueue() // 用于存儲全局的執行非純CPU(可能阻塞)任務 @JvmField val globalBlockingQueue = GlobalQueue() ... // 用于記錄當前處于Parked狀態(一段時間后自動終止)的線程的數量 private val parkedWorkersStack = atomic(0L) ... // 用于保存當前線程池中的線程 // workers[0]永遠為null,作為哨兵位 // index從1到maxPoolSize為有效線程 @JvmField val workers = AtomicReferenceArray<Worker?>(maxPoolSize + 1) ... // 控制狀態 private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT) // 表示已經創建的線程的數量 private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt() // 表示可以獲取的CPU令牌數量,初始值為線程池核心線程數量 private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value) // 獲取指定的狀態的已經創建的線程的數量 private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt() // 獲取指定的狀態的執行阻塞任務的數量 private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt() // 獲取指定的狀態的CPU令牌數量 public inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt() // 當前已經創建的線程數量加1 private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet()) // 當前已經創建的線程數量減1 private inline fun decrementCreatedWorkers(): Int = createdWorkers(controlState.getAndDecrement()) // 當前執行阻塞任務的線程數量加1 private inline fun incrementBlockingTasks() = controlState.addAndGet(1L shl BLOCKING_SHIFT) // 當前執行阻塞任務的線程數量減1 private inline fun decrementBlockingTasks() { controlState.addAndGet(-(1L shl BLOCKING_SHIFT)) } // 嘗試獲取CPU令牌 private inline fun tryAcquireCpuPermit(): Boolean = controlState.loop { state -> val available = availableCpuPermits(state) if (available == 0) return false val update = state - (1L shl CPU_PERMITS_SHIFT) if (controlState.compareAndSet(state, update)) return true } // 釋放CPU令牌 private inline fun releaseCpuPermit() = controlState.addAndGet(1L shl CPU_PERMITS_SHIFT) // 表示當前線程池是否關閉 private val _isTerminated = atomic(false) val isTerminated: Boolean get() = _isTerminated.value companion object { // 用于標記一個線程是否在parkedWorkersStack中(處于Parked狀態) @JvmField val NOT_IN_STACK = Symbol("NOT_IN_STACK") // 線程的三個狀態 // CLAIMED表示線程可以執行任務 // PARKED表示線程暫停執行任務,一段時間后會自動進入終止狀態 // TERMINATED表示線程處于終止狀態 private const val PARKED = -1 private const val CLAIMED = 0 private const val TERMINATED = 1 // 以下五個常量為掩碼 private const val BLOCKING_SHIFT = 21 // 2x1024x1024 // 1-21位 private const val CREATED_MASK: Long = (1L shl BLOCKING_SHIFT) - 1 // 22-42位 private const val BLOCKING_MASK: Long = CREATED_MASK shl BLOCKING_SHIFT // 42 private const val CPU_PERMITS_SHIFT = BLOCKING_SHIFT * 2 // 43-63位 private const val CPU_PERMITS_MASK = CREATED_MASK shl CPU_PERMITS_SHIFT // 以下兩個常量用于require中參數判斷 internal const val MIN_SUPPORTED_POOL_SIZE = 1 // 2x1024x1024-2 internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2 // parkedWorkersStack的掩碼 private const val PARKED_INDEX_MASK = CREATED_MASK // inv表示01反轉 private const val PARKED_VERSION_MASK = CREATED_MASK.inv() private const val PARKED_VERSION_INC = 1L shl BLOCKING_SHIFT }
CoroutineScheduler類中對線程的狀態與權限控制:
availableCpuPermits的初始值為參數中核心線程數corePoolSize的值,表示CoroutineScheduler類中最多只有corePoolSize個核心線程。執行純CPU任務的線程每次執行任務之前需要在availableCpuPermits中進行記錄與申請。blockingTasks表示執行非純CPU任務的數量。這部分線程在執行時不需要CPU令牌。createdWorkers表示當前線程池中所有線程的數量,每個線程在創建或終止時都需要通過在這里進行記錄。這些變量的具體關系如下:
createdWorkers = blockingTasks + corePoolSize - availableCpuPermits
CPU令牌是線程池自定義的概念,不代表時間片,只是為了保證核心線程的數量。
在分析CoroutineScheduler類的dispatch方法之前,還需要分析一下CoroutineScheduler類中的兩個重要的內部類Worker類以及其對應的狀態類WorkerState類。
Worker是一個線程池中任務的核心執行者,幾乎在所有的線程池中都存在Worker的概念。
首先分析一下WorkerState類,代碼如下:
// 一個枚舉類,表示Worker的狀態 enum class WorkerState { // 擁有了CPU令牌,可以執行純CPU任務,也可以執行非純CPU任務 CPU_ACQUIRED, // 可以執行非純CPU任務 BLOCKING, // 當前已經暫停,一段時間后將終止,也有可能被再次使用 PARKING, // 休眠狀態,用于初始狀態,只能執行自己本地任務 DORMANT, // 終止狀態,將不再被使用 TERMINATED }
接下來對Worker類的繼承關系以及其中重要的全局變量進行分析,代碼如下:
// 繼承自Thread類 // 私有化無參的構造方法 internal inner class Worker private constructor() : Thread() { init { // 標記為守護線程 isDaemon = true } // 當前線程在存儲線程池線程的數組workers中的索引位置 @Volatile var indexInArray = 0 set(index) { // 設置線程名 name = "$schedulerName-worker-${if (index == 0) "TERMINATED" else index.toString()}" field = index } // 構造方法 constructor(index: Int) : this() { indexInArray = index } // 獲取當前線程的調度器 inline val scheduler get() = this@CoroutineScheduler // 線程存儲任務的本地隊列 @JvmField val localQueue: WorkQueue = WorkQueue() // 線程的狀態 (內部轉換) @JvmField var state = WorkerState.DORMANT // 線程的控制狀態(外部賦予) val workerCtl = atomic(CLAIMED) // 終止截止時間,表示處于PARKING狀態的線程,在terminationDeadline毫秒后終止 private var terminationDeadline = 0L // 表示當線程處于PARKING狀態,進入parkedWorkersStack后, // 下一個處于PARKING狀態并進入parkedWorkersStack的線程的引用 @Volatile var nextParkedWorker: Any? = NOT_IN_STACK // 偷取其他線程的本地隊列的任務的冷卻時間,后面會解釋 private var minDelayUntilStealableTaskNs = 0L // 生成隨機數,配合算法,用于任務尋找 private var rngState = Random.nextInt() ... // 表示當前線程的本地隊列是否有任務 @JvmField var mayHaveLocalTasks = false ... }
接下來分析Worker類的核心方法——run方法的實現,代碼入下:
override fun run() = runWorker() private fun runWorker() { // 用于配合minDelayUntilStealableTaskNs自旋 var rescanned = false // 線程池未關閉,線程沒有終止,則循環 while (!isTerminated && state != WorkerState.TERMINATED) { // 尋找并獲取任務 val task = findTask(mayHaveLocalTasks) // 如果找到了任務 if (task != null) { // 重制兩個變量 rescanned = false minDelayUntilStealableTaskNs = 0L // 執行任務 executeTask(task) // 繼續循環 continue } else { // 如果沒有找到任務,說明本地隊列肯定沒有任務,因為本地隊列優先查找 // 設置標志位 mayHaveLocalTasks = false } // 走到這里,說明沒有找到任務 // 如果偷取任務的冷卻時間不為0,說明之前偷到過任務 if (minDelayUntilStealableTaskNs != 0L) { // 這里通過rescanned,首次minDelayUntilStealableTaskNs不為0, // 不會立刻進入PARKING狀態,而是再次去尋找任務 // 因為當過多的線程進入PARKING狀態,再次喚起大量的線程很難控制 if (!rescanned) { rescanned = true } else {// 再次掃描,仍然沒有找到任務 // 置位 rescanned = false // 嘗試釋放CPU令牌,并進入WorkerState.PARKING狀態 tryReleaseCpu(WorkerState.PARKING) // 清除中斷標志位 interrupted() // 阻塞minDelayUntilStealableTaskNs毫秒 LockSupport.parkNanos(minDelayUntilStealableTaskNs) // 清零 minDelayUntilStealableTaskNs = 0L } // 阻塞完成后繼續執行 continue } // 走到這里,說明線程可能很長時間都沒有執行任務了,則對其進行暫停處理 // tryPark比tryReleaseCpu要嚴格的多,會被線程會被計入到parkedWorkersStack, // 同時會修改workerCtl狀態 tryPark() } // 退出循環 // 嘗試釋放CPU令牌,并進入終止狀態 tryReleaseCpu(WorkerState.TERMINATED) }
接下來分析Worker線程如何尋找任務,代碼如下:
// 尋找任務 fun findTask(scanLocalQueue: Boolean): Task? { // 嘗試獲取CPU令牌,如果獲取到了,則調用findAnyTask方法,尋找任務 if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue) // 如果沒有獲取到CPU令牌,只能去找非純CPU任務了 // 如果允許掃描本地的任務隊列,則優先在本地隊列中尋找, // 找不到則在全局隊列中尋找,從隊首中獲取 val task = if (scanLocalQueue) { localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull() } else { globalBlockingQueue.removeFirstOrNull() } // 如果在本地隊列和全局隊列中都找不到,則嘗試去其他線程的隊列里偷一個任務 return task ?: trySteal(blockingOnly = true) } // 尋找CPU任務 private fun findAnyTask(scanLocalQueue: Boolean): Task? { // 如果允許掃描本地的任務隊列,則在本地隊列和全局隊列中隨機二選一, // 找不到則在全局隊列中尋找,從隊首中獲取 if (scanLocalQueue) { // 隨機確定本地隊列和全局隊列的優先順序 val globalFirst = nextInt(2 * corePoolSize) == 0 // 獲取任務 if (globalFirst) pollGlobalQueues()?.let { return it } localQueue.poll()?.let { return it } if (!globalFirst) pollGlobalQueues()?.let { return it } } else { // 只能從全局獲取 pollGlobalQueues()?.let { return it } } // 走到這里,說明本地隊列和全局隊列中都找不到 // 那么就嘗試去其他線程的隊列里偷一個任務 return trySteal(blockingOnly = false) } // 從全局隊列獲取任務 private fun pollGlobalQueues(): Task? { // 隨機獲取CPU任務或者非CPU任務 if (nextInt(2) == 0) { // 優先獲取CPU任務 globalCpuQueue.removeFirstOrNull()?.let { return it } return globalBlockingQueue.removeFirstOrNull() } else { // 優先獲取非CPU任務 globalBlockingQueue.removeFirstOrNull()?.let { return it } return globalCpuQueue.removeFirstOrNull() } } // 偷取其他線程的本地隊列的任務 // blockingOnly表示是否只偷取阻塞任務 private fun trySteal(blockingOnly: Boolean): Task? { // 只有當前線程的本地隊列為空的時候,才能偷其他線程的本地隊列 assert { localQueue.size == 0 } // 獲取已經存在的線程的數量 val created = createdWorkers // 如果線程總數為0或1,則不偷取,直接返回 // 0:需要等待初始化 // 1:避免在單線程機器上過度偷取 if (created < 2) { return null } // 隨機生成一個存在的線程索引 var currentIndex = nextInt(created) // 默認的偷取冷卻時間 var minDelay = Long.MAX_VALUE // 循環遍歷 repeat(created) { // 每次循環索引自增,帶著下一行代碼表示,從位置currentIndex開始偷 ++currentIndex // 如果超出了,則從頭繼續 if (currentIndex > created) currentIndex = 1 // 從數組中獲取線程 val worker = workers[currentIndex] // 如果線程不為空,并且不是自己 if (worker !== null && worker !== this) { assert { localQueue.size == 0 } // 根據偷取的類型進行偷取 val stealResult = if (blockingOnly) { // 偷取非CPU任務到本地隊列中 localQueue.tryStealBlockingFrom(victim = worker.localQueue) } else { // 偷取任務到本地隊列中 localQueue.tryStealFrom(victim = worker.localQueue) } // 如果返回值為TASK_STOLEN,說明偷到了 // 如果返回值為NOTHING_TO_STEAL,說明要偷的線程的本地隊列是空的 if (stealResult == TASK_STOLEN) { // 從隊列的隊首拿出來返回 return localQueue.poll() // 如果返回值大于零,表示偷取的冷卻時間,說明沒有偷到 } else if (stealResult > 0) { // 說明至少還要等待stealResult時間才能偷取這個任務 // 計算偷取冷卻時間 minDelay = min(minDelay, stealResult) } } } // 設置偷取等待時間 minDelayUntilStealableTaskNs = if (minDelay != Long.MAX_VALUE) minDelay else 0 // 返回空 return null } // 基于Marsaglia xorshift RNG算法 // 用于在2^32-1范圍內計算偷取目標 internal fun nextInt(upperBound: Int): Int { var r = rngState r = r xor (r shl 13) r = r xor (r shr 17) r = r xor (r shl 5) rngState = r val mask = upperBound - 1 // Fast path for power of two bound if (mask and upperBound == 0) { return r and mask } return (r and Int.MAX_VALUE) % upperBound }
通過對這部分代碼的分析,可以知道線程在尋找任務時,首先會嘗試獲取CPU令牌,成為核心線程。如果線程成為了核心線程,則隨機從本地或全局的兩個隊列中獲取一個任務,獲取不到則去隨機偷取一個任務。如果沒有獲取到CPU令牌,則優先在本地獲取任務,獲取不到則在全局非CPU任務隊列中獲取任務,獲取不到則去偷取一個非CPU任務。
如果偷取的任務沒有達到最小的可偷取時間,則返回需要等待的時間。如果偷取任務成功,則直接加入到本地隊列中。偷取的核心過程,會在后面進行分析。
接下來分析任務被獲取到后如何被執行,代碼如下:
// 執行任務 private fun executeTask(task: Task) { // 獲取任務類型,類型為純CPU或可能阻塞 val taskMode = task.mode // 重置線程閑置狀態 idleReset(taskMode) // 任務執行前 beforeTask(taskMode) // 執行任務 runSafely(task) // 任務執行后 afterTask(taskMode) } // 重置線程閑置狀態 private fun idleReset(mode: Int) { // 重置從PARKING狀態到TERMINATED狀態的時間 terminationDeadline = 0L // 如果當前狀態為PARKING,說明尋找任務時沒有獲取到CPU令牌 if (state == WorkerState.PARKING) { assert { mode == TASK_PROBABLY_BLOCKING } // 設置狀態為BLOCKING state = WorkerState.BLOCKING } } // 任務執行前 private fun beforeTask(taskMode: Int) { // 如果執行的任務為純CPU任務,說明當前線程獲取到了CPU令牌,是核心線程,直接返回 if (taskMode == TASK_NON_BLOCKING) return // 走到這里,說明線程執行的是非純CPU任務, // 沒有CPU令牌也可以執行,因此嘗試釋放CPU令牌,進入WorkerState.BLOCKING if (tryReleaseCpu(WorkerState.BLOCKING)) { // 如果釋放CPU令牌成功,則喚起一個線程去申請CPU令牌 signalCpuWork() } } // 執行任務 fun runSafely(task: Task) { try { task.run() } catch (e: Throwable) { // 異常發生時,通知當前線程的異常處理Handler val thread = Thread.currentThread() thread.uncaughtExceptionHandler.uncaughtException(thread, e) } finally { unTrackTask() } } // 任務執行后 private fun afterTask(taskMode: Int) { // 如果執行的任務為純CPU任務,說明當前線程獲取到了CPU令牌,是核心線程,直接返回 if (taskMode == TASK_NON_BLOCKING) return // 如果執行的是非CPU任務 // 當前執行的非CPU任務數量減一 decrementBlockingTasks() // 獲取當前線程狀態 val currentState = state // 如果線程當前不是終止狀態 if (currentState !== WorkerState.TERMINATED) { assert { currentState == WorkerState.BLOCKING } // 設置為休眠狀態 state = WorkerState.DORMANT } }
了解Worker類的工作機制后,接下來分析CoroutineScheduler類的dispatch方法,代碼如下:
// block表示要執行的任務 // taskContext表示任務執行的上下文,里面包含任務的類型,和執行完成后的回調 // tailDispatch表示當前任務是否進行隊列尾部調度, // 當tailDispatch為true時,當前block會在當前線程的本地隊列里的任務全部執行完后再執行 fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) { // 上報時間,TimeSource相關,無需關注 trackTask() // 創建任務 val task = createTask(block, taskContext) // 獲取當前的Worker,可能獲取不到 val currentWorker = currentWorker() // 將當前的任務添加到當前線程的本地隊列中 val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch) // 不為空,說明沒有添加進去,說明當前的線程不是Worker if (notAdded != null) { // 將任務添加到全局隊列中,如果添加失敗了 if (!addToGlobalQueue(notAdded)) { // 說明線程池正在關閉,拋出異常 throw RejectedExecutionException("$schedulerName was terminated") } } // skipUnpark表示是否跳過喚起狀態,取決于這下面兩個參數 val skipUnpark = tailDispatch && currentWorker != null // 如果當前類型為純CPU任務 if (task.mode == TASK_NON_BLOCKING) { // 如果跳過喚醒,則直接返回 if (skipUnpark) return // 喚醒一個執行純CPU任務的線程 signalCpuWork() } else { // 喚醒一個執行非CPU任務的線程 signalBlockingWork(skipUnpark = skipUnpark) } } // 創建任務 internal fun createTask(block: Runnable, taskContext: TaskContext): Task { // 獲取當前時間 val nanoTime = schedulerTimeSource.nanoTime() // 如果當前的block是Task類型的 if (block is Task) { // 重新設置提交時間和任務上下文 block.submissionTime = nanoTime block.taskContext = taskContext // 返回 return block } // 封裝成TaskImpl,返回 return TaskImpl(block, nanoTime, taskContext) } // 任務模型 // block表示執行的任務 // submissionTime表示任務提交時間 // taskContext表示任務執行的上下文 internal class TaskImpl( @JvmField val block: Runnable, submissionTime: Long, taskContext: TaskContext ) : Task(submissionTime, taskContext) { override fun run() { try { block.run() } finally { // 任務執行完畢后,會在同一個Worker線程中回調afterTask方法 taskContext.afterTask() } } override fun toString(): String = "Task[${block.classSimpleName}@${block.hexAddress}, $submissionTime, $taskContext]" } // 將任務添加到本地隊列 private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? { // 如果當前線程為空,則返回任務 if (this == null) return task // 如果線程處于終止狀態,則返回任務 if (state === WorkerState.TERMINATED) return task // 如果任務為純CPU任務,但是線程沒有CPU令牌 if (task.mode == TASK_NON_BLOCKING && state === WorkerState.BLOCKING) { // 則返回任務 return task } // 標記本地隊列有任務 mayHaveLocalTasks = true // 添加到隊列 return localQueue.add(task, fair = tailDispatch) } // 添加到全局隊列 private fun addToGlobalQueue(task: Task): Boolean { // 根據任務的類型,添加到全局隊列的隊尾 return if (task.isBlocking) { globalBlockingQueue.addLast(task) } else { globalCpuQueue.addLast(task) } } // 對當前線程進行強制轉換,如果調度器也是當前的調度器則返回Worker對象 private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this } // 喚起一個執行非純CPU任務的線程 private fun signalBlockingWork(skipUnpark: Boolean) { // 當前執行阻塞任務的線程數量加1,并獲取當前的控制狀態 val stateSnapshot = incrementBlockingTasks() // 如果跳過喚起,則返回 if (skipUnpark) return // 嘗試喚起,喚起成功,則返回 if (tryUnpark()) return // 喚起失敗,則根據當前的控制狀態,嘗試創建新線程,成功則返回 if (tryCreateWorker(stateSnapshot)) return // 再次嘗試喚起,防止多線程競爭情況下,上面的tryUnpark方法正好卡在線程釋放CPU令牌與進入PARKING狀態之間 // 因為線程先釋放CPU令牌,后進入PARKING狀態 tryUnpark() } // 喚起一個執行純CPU任務的線程 internal fun signalCpuWork() { // 嘗試喚起,喚起成功,則返回 if (tryUnpark()) return // 喚起失敗,則嘗試創建新線程,成功則返回 if (tryCreateWorker()) return // 再次嘗試喚起,防止多線程競爭情況下,上面的tryUnpark方法正好卡在線程釋放CPU令牌與進入PARKING狀態之間 // 因為線程先釋放CPU令牌,后進入PARKING狀態 tryUnpark() }
通過對上面的代碼進行分析,可以知道CoroutineScheduler類的dispatch方法,首先會對任務進行封裝。正常情況下,任務都會根據類型添加到全局隊列中,接著根據任務類型,隨機喚起一個執行對應類型任務的線程去執行任務。
當任務執行完畢后,會回調任務中自帶的afterTask方法。根據之前對LimitingDispatcher的分析,可以知道,此時tailDispatch參數為true,同時當前的線程也是Worker線程,因此會被直接添加到線程的本地隊列中,由于任務有對應的線程執行,因此跳過了喚起其他線程執行任務的階段。這里我們可以稱這個機制為尾調機制。
為什么CoroutineScheduler類中要設計一個尾調機制呢?
在傳統的線程池的線程充足情況下,一個任務到來時,會被分配一個線程。假設前后兩個任務A與B有依賴關系,需要在執行A再執行B,這時如果兩個任務同時到來,執行A任務的線程會直接執行,而執行B線程的任務可能需要被阻塞。而一旦線程阻塞會造成線程資源的浪費。而協程本質上就是多個小段程序的相互協作,因此這種場景會非常多,通過這種機制可以保證任務的執行順序,同時減少資源浪費,而且可以最大限度的保證一個連續的任務執行在同一個線程中。
至此,Dispatchers.IO線程池的工作原理全部分析完畢。
接下來分析一些更加細節的過程。首先分析一下Worker線程本地隊列調用的add方法是如何添加任務的,代碼如下:
// 本地隊列中存儲最后一次尾調的任務 private val lastScheduledTask = atomic<Task?>(null) // fair表示是否公平的執行任務,FIFO,默認為false fun add(task: Task, fair: Boolean = false): Task? { // fair為true,則添加到隊尾 if (fair) return addLast(task) // 如果fair為false,則從lastScheduledTask中取出上一個尾調的任務, // 并把這次的新尾調任務保存到lastScheduledTask val previous = lastScheduledTask.getAndSet(task) ?: return null // 如果獲取上一次的尾調任務不為空,則添加到隊尾 return addLast(previous) }
根據之前對Worker類的分析,任務偷取的核心代碼鎖定在了WorkQueue類的兩個方法上:一個是偷取非純CPU任務的tryStealBlockingFrom方法,另一個可以偷所有類型任務的tryStealFrom方法,代碼如下:
internal const val BUFFER_CAPACITY_BASE = 7 internal const val BUFFER_CAPACITY = 1 shl BUFFER_CAPACITY_BASE // 1000 0000 internal const val MASK = BUFFER_CAPACITY - 1 // 0111 1111 // 存儲任務的數組,最多存儲128 private val buffer: AtomicReferenceArray<Task?> = AtomicReferenceArray(BUFFER_CAPACITY) // producerIndex表示上一次向任務數組中添加任務的索引 // consumerIndex表示上一次消費的任務索引 // producerIndex永遠大于等于consumerIndex // 二者差值就是當前任務數組中任務的數量 private val producerIndex = atomic(0) private val consumerIndex = atomic(0) // buffer中非純CPU任務的數量(避免遍歷掃描) private val blockingTasksInBuffer = atomic(0) // 偷所有類型任務 fun tryStealFrom(victim: WorkQueue): Long { assert { bufferSize == 0 } // 從要偷取線程的本地隊列中輪訓獲取一個任務 val task = victim.pollBuffer() // 如果獲取到了任務 if (task != null) { // 將它添加到自己的本地隊列中 val notAdded = add(task) assert { notAdded == null } // 返回偷取成功的標識 return TASK_STOLEN } // 如果偷取失敗,嘗試偷取指定線程的尾調任務 return tryStealLastScheduled(victim, blockingOnly = false) } // 輪訓獲取任務 private fun pollBuffer(): Task? { // 死循環 while (true) { // 獲取上一次消費的任務索引 val tailLocal = consumerIndex.value // 如果當前任務數組中沒有多處的任務,則返回空 if (tailLocal - producerIndex.value == 0) return null // 計算偷取位置,防止數組過界 val index = tailLocal and MASK // 通過CAS方式,將consumerIndex加一,表示下一次要從tailLocal + 1處開始偷取 if (consumerIndex.compareAndSet(tailLocal, tailLocal + 1)) { // 從偷取位置初取出任務,如果偷取的任務為空,則繼續循環 val value = buffer.getAndSet(index, null) ?: continue // 偷取成功 // 若任務為阻塞任務,blockingTasksInBuffer的值減一 value.decrementIfBlocking() // 返回任務 return value } } } // 偷取非純CPU任務 fun tryStealBlockingFrom(victim: WorkQueue): Long { assert { bufferSize == 0 } // 從consumerIndex位置開始偷 var start = victim.consumerIndex.value // 偷到producerIndex處截止 val end = victim.producerIndex.value // 獲取任務數組 val buffer = victim.buffer // 循環偷取 while (start != end) { // 計算偷取位置,防止數組過界 val index = start and MASK // 如果非純CPU任務數為0,則直接退出循環 if (victim.blockingTasksInBuffer.value == 0) break // 獲取index處的任務 val value = buffer[index] // 如果任務存在,而且是非純CPU任務,同時成功的通過CAS設置為空 if (value != null && value.isBlocking && buffer.compareAndSet(index, value, null)) { // blockingTasksInBuffer的值減一 victim.blockingTasksInBuffer.decrementAndGet() // 將偷取的任務添加到當前線程的本地隊列中 add(value) // 返回偷取成功標識 return TASK_STOLEN } else { // 如果偷取失敗,自增再次循環,從下一個位置開始偷 ++start } } // 如果從任務數組中偷取失敗,嘗試偷取指定線程的尾調任務 return tryStealLastScheduled(victim, blockingOnly = true) } // 偷取指定線程的尾調任務 private fun tryStealLastScheduled(victim: WorkQueue, blockingOnly: Boolean): Long { // 死循環 while (true) { // 獲取指定線程的尾調任務,如果任務不存在,則返回偷取失敗標識符 val lastScheduled = victim.lastScheduledTask.value ?: return NOTHING_TO_STEAL // 如果要偷取的是非純CPU任務,但是任務類型為純CPU任務,說明只有核心線程才能偷 // 返回偷取失敗標識符 if (blockingOnly && !lastScheduled.isBlocking) return NOTHING_TO_STEAL // 獲取當前時間 val time = schedulerTimeSource.nanoTime() //計算任務從添加開始到現在經過的時長 val staleness = time - lastScheduled.submissionTime // 如果時長小于偷取冷卻時間 if (staleness < WORK_STEALING_TIME_RESOLUTION_NS) { // 返回當前線程需要等待的時間 return WORK_STEALING_TIME_RESOLUTION_NS - staleness } // 通過CAS,將lastScheduledTask設置為空,防止被其他線程執行 if (victim.lastScheduledTask.compareAndSet(lastScheduled, null)) { // 偷取成功,加入到當前線程的隊列中 add(lastScheduled) // 返回偷取成功表示 return TASK_STOLEN } // 繼續循環 continue } } // 偷取冷卻時間,尾調任務從添加開始, // 最少經過WORK_STEALING_TIME_RESOLUTION_NS時間才可以被偷 @JvmField internal val WORK_STEALING_TIME_RESOLUTION_NS = systemProp( "kotlinx.coroutines.scheduler.resolution.ns", 100000L )
CoroutineScheduler類是核心的線程池,用于任務的執行。LimitingDispatcher類對CoroutineScheduler類進行代理,是CoroutineScheduler類尾調機制的使用者,對任務進行初步排隊。
LimitingDispatcher類中的任務隊列。CoroutineScheduler類中的兩個全局隊列。Worker類中的本地隊列。
一個任務執行完,可以通過回調,在同一個Worker線程中再存儲一個待執行任務,該任務將在Worker線程本地隊列目前已存在的任務,執行完畢后再執行。
所有任務分成純CPU任務和非純CPU任務兩種,對應著核心線程和非核心線程。
所有線程在執行前都先嘗試成為核心線程,核心線程可以從兩種任務中任意選擇執行,非核心線程只能執行非純CPU任務。核心線程如果選擇執行非純CPU任務會變成非核心線程
WorkQueue類根據隨機算法提供任務偷取機制,一個Worker線程可以從其他Worker線程的本地隊列中偷取任務。
到此,相信大家對“Android Dispatchers.IO線程池源碼分析”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。