您好,登錄后才能下訂單哦!
池技術是性能優化的重要手段:連接池,線程池已經是開發中的標配了。面試中這個知識點也是高頻問題。抽空學習了Java的ThreadPoolExecutor, 把學習的思路記錄一下。
由于線程的創建和銷毀都是系統層面的操作,涉及到系統資源的占用和回收,所以創建線程是一個重量級的操作。為了提升性能,就引入了線程池;即線程復用。Java不僅提供了線程池,還提供了線程池的操作工具類。 我們由淺入深了解一下。
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadDemo {
static class Worker implements Runnable{
public void run(){
System.out.println("run work "+Thread.currentThread().getName() );
}
}
public static void main(String[] args) {
Worker w1 = new Worker();
ExecutorService service = Executors.newFixedThreadPool(10);
service.submit(w1);
service.shutdown();
}
}
看Executors的源碼,發現其使用的是ThreadPoolExecutor。 研究一下ThreadPoolExecutor, 發現其默認的參數Executors.defaultThreadFactory(), defaultHandler
。線程池的創建工廠默認如下:
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
也就是自定義了一下線程的名字,將線程歸到了同一個組。
線程池的defaultHandler
如下:
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
也就是說,當提交的任務超過線程池的容量,那么就會拋出RejectedExecutionException
異常。 但是使用Executors會發現,并沒有拋出異常。這是因為Executors創建BlockingQueue
時沒有指定隊列的容量。
換言之,線程池能容納的任務數量最多為maximumPoolSize
+ queueSize
。 比如線程池如下new ThreadPoolExecutor(10, 11, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(5));
則最大任務數量為16個,超過16個就會拋出異常。
線程池中線程數量有多少呢?先運行如下的代碼:
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadDemo {
static class Worker implements Runnable{
public void run(){
try {
Thread.sleep(1000);
System.out.println("done work "+Thread.currentThread().getName() );
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
Worker w1 = new Worker();
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 4,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(5));
for(int i=0;i<9;i++) {
executor.submit(w1);
}
executor.shutdown();
}
}
可以發現最多開啟了4個線程。 這4個線程就對應了4個Worker的實例。
看worker的源碼可以發現,它兼備AQS和Runnable兩個特性。 我們只關注它Runnable的特性。
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
在這個線程中不斷從隊列中獲取任務,然后執行。 worker中反復出現的ctl
又是什么呢?
ctl是兩個變量組合,一個32位的int, 高3位用于控制線程池的狀態,低29位用于記錄線程池啟動線程的數量。
所以有這么幾個方法
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
整個線程池的核心,就worker
和ctl
的理解。 有點復雜,主要是集中了:
1. AQS
2. BlockingQueue
這也是為什么我建議先學AQS,后學線程池的實現原理。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。