您好,登錄后才能下訂單哦!
本文小編為大家詳細介紹“Java線程池execute()方法怎么用”,內容詳細,步驟清晰,細節處理妥當,希望這篇“Java線程池execute()方法怎么用”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。
* Thread pools address two different problems: they usually * provide improved performance when executing large numbers of * asynchronous tasks, due to reduced per-task invocation overhead, * and they provide a means of bounding and managing the resources, * including threads, consumed when executing a collection of tasks. * Each {@code ThreadPoolExecutor} also maintains some basic * statistics, such as the number of completed tasks.
線程池處理了兩個不同的問題,線程池通過減少線程正式調用之前的開銷來給大量異步任務更優秀的表現,與此同時給出了一系列綁定管理任務線程的一種手段。每個線程池都包含了一些基本信息,比如內部完成的任務數量。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl作為AtomicInteger類存放了類中的兩種信息,在其中由高3位來保存線程池的狀態,后29位來保存此時線程池中的Woker類線程數量(由此可知,線程池中的線程數量最高可以接受大約在五億左右)。由此可見給出的runStateOf()和workerCountOf()方法分別給出了查看線程狀態和線程數量的方法。
讓我們看作者給出的注釋
* RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completed
RUNNING
狀態可以接受新進來的任務,同時也會執行隊列里的任務。
SHUTDOWN
狀態已經不會再接受新任務,但仍舊會處理隊列中的任務。
STOP
狀態在之前的基礎上,不會處理隊列中的人物,在執行的任務也會直接被打斷。
TIDYING
狀態在之前的基礎上,所有任務都已經終止,池中的Worker線程都已經為0,也就是stop狀態在清理完所有工作線程之后就會進入該狀態,同時在shutdown狀態在隊列空以及工作線程清理完畢之后也會直接進入這個階段,這一階段會循環執行terminated()方法。
TERMINATED
狀態作為最后的狀態,在之前的基礎上terminated()方法也業已執行完畢,才會從上個狀態進入這個狀態,代表線程池已經完全停止。
由于線程池的狀態都是通過AtomicInteger來保存的,可以通過比較的方式簡單的得到當前線程狀態。
private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet<Worker> workers = new HashSet<Worker>(); private final Condition termination = mainLock.newCondition(); private int largestPoolSize; private long completedTaskCount; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; private volatile boolean allowCoreThreadTimeOut; private volatile int corePoolSize; private volatile int maximumPoolSize;
corePoolSize
表示線程池中允許存活最少的工作線程數量,但值得注意的是如果allowCoreThreadTimeOut一旦設置true(默認false),每個線程的存活時間只有keepAliveTime也就是說在allowCoreThreadTimeOut為true的時候,該線程池最小的工作線程數量為0;maximumPoolSize代表線程池中最大的工作線程數量。
keepAliveTime
為線程池中工作線程數量大于corePoolSize時,每個工作線程的在等待工作時最長的等待時間。
workQueue
作為線程池的任務等待隊列,這個將在接下來的execute()里詳細解釋。
Workers
作為存放線程池中存放工作線程的容器。
largestPoolSize
用來記錄線程池中存在過的最大的工作線程數量。
completedTaskCount
用來記錄線程池完成的任務的總數。
Handler
作為線程池中在不能接受任務的時候的拒絕策略,我們可以實現自己的拒絕策略,在實現了RejectedExecutionHandler接口的前提下。下面是線程池的默認拒絕策略,
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
threadFactory作為線程池生產線程的工廠類
public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
execute()內部的調用邏輯非常清晰。
如果當前線程池的工作線程數量小于corePoolSize,那么直接調用addWoker(),來添加工作線程。
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { mainLock.lock(); try { int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
這段方法比較長,但整體的邏輯還是清晰的。
首先判斷當前線程池的狀態,如果已經狀態不是shutdown或者running,或者已經為shutdown但是工作隊列已經為空,那么這個時候直接返回添加工作失敗。接下來是對線程池線程數量的判斷,根據調用時的core的值來判斷是跟corePoolSize還是 maximumPoolSize判斷。
在確認了線程池狀態以及線程池中工作線程數量之后,才真正開始添加工作線程。
新建立一個worker類(線程池的內部類,具體的工作線程),將要執行的具體線程做為構造方法中的參數傳遞進去,接下來將其加入線程池的工作線程容器workers,并且更新工作線程最大量,最后調用worker工作線程的start()方法,就完成了工作線程的建立與啟動。
讓我們回到execute()方法,如果我們在一開始的線程數量就大于corePoolSize,或者我們在調用addworker()方法的過程中出現了問題導致添加工作線程數量失敗,那么我們會繼續執行接下來的邏輯。
在判斷完畢線程池的狀態后,則會將任務通過workQueue.offer())方法試圖加進任務隊列。Offer()方法的具體實現會根據在線程池構造方法中選取的任務隊列種類而產生變化。
但是如果成功加入了任務隊列,仍舊需要注意判斷如果線程池的狀態如果已經不是running那么會拒絕執行這一任務并執行相應的拒絕策略。在最后需要記得成功加入隊列成功后如果線程池中如果已經沒有了工作線程,需要重新建立一個工作線程去執行仍舊在任務隊列中等待執行的任務。
如果在之前的前提下加入任務隊列也失敗了(比如任務隊列已滿),則會在不超過線程池最大線程數量的前提下建立一個工作線程來處理。
如果在最后的建立工作線程也失敗了,那么我們只有很遺憾的執行任務的拒絕策略了。
在之前的過程中我們建立了工作線程Worker()類,那么我們現在看看worker類的內部實現,也可以說是線程池的核心部分。
接下來是Worker()類的成員
final Thread thread; Runnable firstTask; volatile long completedTasks;
thread
作為worker的工作線程空間,由線程池中所設置的線程工廠生成。
firstTask
則是worker在構造方法中所接受到的所要執行的任務。
completedTasks
作為該worker類所執行完畢的任務總數。
接下來我們可以看最重要的,也就是我們之前建立完Worker類之后立馬調用的run()方法了
public void run() { runWorker(this); }
我們可以繼續追蹤下去
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
如果這個worker還沒有執行過在構造方法就傳入的任務,那么在這個方法中,會直接執行這一任務,如果沒有,則會嘗試去從任務隊列當中去取的新的任務。
但是在真正調用任務之前,仍舊會判斷線程池的狀態,如果已經不是running亦或是shutdwon,則會直接確保線程被中斷。如果沒有,將會繼續執行并確保不被中斷。
接下來可見,我們所需要的任務,直接在工作線程中直接以run()方式以非線程的方式所調用,這里也就是我們所需要的任務真正執行的地方。
在執行完畢后,工作線程的使命并沒有真正宣告段落。在while部分worker仍舊會通過getTask()方法試圖取得新的任務。
private Runnable getTask() { boolean timedOut = false; retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } boolean timed; for (;;) { int wc = workerCountOf(c); timed = allowCoreThreadTimeOut || wc > corePoolSize; if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
首先仍舊會判斷線程池的狀態是否是running還是shutdown以及stop狀態下隊列是否仍舊有需要等待執行的任務。如果狀態沒有問題,則會跟據allowCoreThreadTimeOut和corePoolSize的值通過對前面這兩個屬性解釋的方式來選擇從任務隊列中獲得任務的方式(是否設置timeout)。其中的timedOut保證了確認前一次試圖取任務時超時發生的記錄,以確保工作線程的回收。
調用了processWorkerExist()方法來執行工作線程的回收。
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; } addWorker(null, false); } }
在這一方法中,首先確保已經重新更新了線程池中工作線程的數量,之后從線程池中的工作線程容器移去當前工作線程,并且將完成的任務總數加到線程池的任務總數當中。
在最后仍舊要確保線程池中依舊存在大于等于最小線程數量的工作線程數量存在,如果沒有,則重新建立工作線程去等待處理任務隊列中任務。
讀到這里,這篇“Java線程池execute()方法怎么用”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。