您好,登錄后才能下訂單哦!
這篇文章主要講解了“如何實現java簡單的線程池”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“如何實現java簡單的線程池”吧!
拆分實現流程
實現方式
1.拒絕策略
2.阻塞隊列
3.線程池和工作線程
策略模式
對比JDK的線程池
線程池的狀態轉化
請看下面這張圖
首先我們得對線程池進行一個功能拆分
Thread Pool 就是我們的線程池,t1,t2,t3代表三個線程
Blocking Queue代表阻塞隊列
main代表main方法的線程
task1,task2,task3代表要執行的每個任務
現在我們梳理一下執行的流程,注意這里是簡略版的,文章后面我會給出詳細版的
所以此時,我們發現了需要創建幾個類,或者說幾個角色,分別是
線程池
工作線程
阻塞隊列
拒絕策略(干嘛的?就是當線程數已經滿了,并且阻塞隊列也滿了,還有任務想進入阻塞隊列的時候,就可以拒絕這個任務)
/** * 拒絕策略 */ @FunctionalInterface interface RejectPolicy<T>{ //queue就是我們自己實現的阻塞隊列,task是任務 void reject(BlockingQueue<T> queue,T task); }
我們需要實現四個方法,獲取和添加,超時獲取和超時添加,至于方法實現的細節,我都備注了大量的注釋進行解釋。
/** * 阻塞隊列 */ class BlockingQueue<T>{ //阻塞隊列 private Deque<T> queue = new ArrayDeque<>(); //鎖 private ReentrantLock lock = new ReentrantLock(); //生產者條件變量 private Condition fullWaitSet = lock.newCondition(); //消費者條件變量 private Condition emptyWaitSet = lock.newCondition(); //容量 private int capacity; public BlockingQueue(int capacity){ this.capacity = capacity; } //帶有超時阻塞獲取 public T poll(long timeout, TimeUnit timeUnit){ lock.lock(); try { //將timeout統一轉換為納秒 long nanos = timeUnit.toNanos(timeout); while(queue.isEmpty()){ try { if(nanos <= 0){ //小于0,說明上次沒有獲取到,代表已經超時了 return null; } //返回值是剩余的時間 nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); //通知生產者 fullWaitSet.signal(); return t; }finally { lock.unlock(); } } //阻塞獲取 public T take(){ lock.lock(); try{ while(queue.isEmpty()){ //如果任務隊列為空,代表線程池沒有可以執行的內容 try { /* 也就說此時進來的線程是執行不了任務的,所以此時emptyWaitSet消費者要進行阻塞狀態 等待下一次喚醒,然后繼續判斷隊列是否為空 */ emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } /* 代碼執行到這里。說明任務隊列不為空,線程池就從任務隊列拿出一個任務出來執行 也就是說把阻塞隊列的一個任務出隊 */ T t = queue.removeFirst(); /* 然后喚醒之前存放在生成者Condition休息室,因為由于之前阻塞隊列已滿,fullWaitSet才會進入阻塞狀態 所以當阻塞隊列刪除了任務,就要喚醒之前進入阻塞狀態的fullWaitSet */ fullWaitSet.signal(); //返回任務 return t; }finally { lock.unlock(); } } //阻塞添加 public void put(T task){ lock.lock(); try { while(queue.size() == capacity){ //任務隊列滿了 try { System.out.println("等待加入任務隊列"+task); /* 也就說此時進來的任務是進不了阻塞隊列的,已經滿了,所以此時生產者Condition要進入阻塞狀態 等待下一次喚醒,然后繼續判斷隊列是否為空 */ fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //任務隊列還未滿 System.out.println("加入任務隊列"+task); //把任務加入阻塞隊列 queue.addLast(task); /* 然后喚醒之前存放在消費者Condition休息室,因為由于之前阻塞隊列為空,emptyWaitSet才會進入阻塞狀態 所以當阻塞隊列加入了任務,就要喚醒之前進入阻塞狀態的emptyWaitSet */ emptyWaitSet.signal(); }finally { lock.unlock(); } } //帶超時阻塞時間添加 public boolean offer(T task,long timeout,TimeUnit timeUnit){ lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while(queue.size() == capacity){ try { if(nanos < 0){ return false; } System.out.println("等待加入任務隊列"+task); //不會一直阻塞,超時就會繼續向下執行 nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("加入任務隊列"+task); queue.addLast(task); emptyWaitSet.signal(); return true; }finally { lock.unlock(); } } //獲取任務數量 public int size(){ lock.lock(); try{ return queue.size(); }finally { lock.unlock(); } } //嘗試添加任務,如果阻塞隊列已經滿了,就使用拒絕策略 public void tryPut(RejectPolicy<T> rejectPolicy, T task){ lock.lock(); try { //判斷隊列是否已滿 if(queue.size() == capacity){ rejectPolicy.reject(this,task); }else{ //有空閑 System.out.println("加入任務隊列"+task); queue.addLast(task); emptyWaitSet.signal(); } }finally { lock.unlock(); } } }
我把工作線程當成線程池的內部類去實現。方便調用變量。
/** * 線程池 */ class ThreadPool{ //阻塞隊列 private BlockingQueue<Runnable> taskQueue; //線程集合 private HashSet<Worker> workers = new HashSet<>(); //核心線程數 private int coreSize; //獲取任務的超時時間 private long timeout; private TimeUnit timeUnit; private RejectPolicy<Runnable> rejectPolicy; public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity,RejectPolicy<Runnable> rejectPolicy) { this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(queueCapacity); this.rejectPolicy = rejectPolicy; } //執行任務 public void execute(Runnable task){ synchronized (workers){ if(workers.size() <= coreSize){ //當前的線程數小于核心線程數 Worker worker = new Worker(task); workers.add(worker); //讓線程開始工作,執行它的run方法 worker.start(); }else{ // 1) 死等 // 2) 帶超時等待 // 3) 讓調用者放棄任務執行 // 4) 讓調用者拋出異常 // 5) 讓調用者自己執行任務 taskQueue.tryPut(rejectPolicy,task); } } } /** * 工作線程,也就是線程池里面的線程 */ class Worker extends Thread{ private Runnable task; public Worker(Runnable task){ this.task = task; } @Override public void run() { //執行任務 // 1) 當 task 不為空,執行任務 // 2) 當 task 執行完畢,再接著從任務隊列獲取任務并執行 while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) { try { System.out.println("正在執行的任務" + task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { //代表這個任務已經執行完了 task = null; } } synchronized (workers) { System.out.println("worker 被移除" + this); workers.remove(this); } } } }
細心的小伙伴已經發現,我在拒絕策略這里使用了23種設計模式的策略模式,因為我沒有將拒絕的方式寫死,而是交給了調用者去實現。
下面是JDK自帶的線程池
經典的七大核心參數
corePoolSize:核心線程數
queueCapacity:任務隊列容量(阻塞隊列)
maxPoolSize:最大線程數
keepAliveTime:線程空閑時間
TimeUnit unit:超時時間單位
ThreadFactory threadFactory:線程工程
rejectedExecutionHandler:任務拒絕處理器
實際上我們自己實現的也大同小異,只不過JDK官方的更為復雜。
JDK線程執行的流程圖
線程我們知道在操作系統層面有5種狀態
初始狀態:僅是在語言層面創建了線程對象,還未與操作系統線程關聯
可運行狀態(就緒狀態):指該線程已經被創建(與操作系統線程關聯),可以由 CPU 調度執行
運行狀態:指獲取了 CPU 時間片運行中的狀態,當 CPU 時間片用完,會從【運行狀態】轉換至【可運行狀態】,會導致線程的上下文切換
阻塞狀態
如果調用了阻塞 API,如 BIO 讀寫文件,這時該線程實際不會用到 CPU,會導致線程上下文切換,進入【阻塞狀態】
等 BIO 操作完畢,會由操作系統喚醒阻塞的線程,轉換至【可運行狀態】
與【可運行狀態】的區別是,對【阻塞狀態】的線程來說只要它們一直不喚醒,調度器就一直不會考慮調度它們
終止狀態:表示線程已經執行完畢,生命周期已經結束,不會再轉換為其它狀態
線程在Java API層面有6種狀態
NEW 線程剛被創建,但是還沒有調用 start() 方法
RUNNABLE 當調用了 start() 方法之后,注意,Java API 層面的
RUNNABLE 狀態涵蓋了 操作系統 層面的【可運行狀態】、【運行狀態】
BLOCKED , WAITING , TIMED_WAITING 都是 Java API 層面對【阻塞狀態】的細分
TERMINATED 當線程代碼運行結束
線程池有5種狀態
RUNNING:能接受新任務,并處理阻塞隊列中的任務
SHUTDOWN:不接受新任務,但是可以處理阻塞隊列中的任務
STOP:不接受新任務,并且不處理阻塞隊列中的任務,并且還打斷正在運行任務的線程,就是直接不干了!
TIDYING:所有任務都終止,并且工作線程也為0,處于關閉之前的狀態
TERMINATED:已關閉。
感謝各位的閱讀,以上就是“如何實現java簡單的線程池”的內容了,經過本文的學習后,相信大家對如何實現java簡單的線程池這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。