您好,登錄后才能下訂單哦!
Java中怎么使用BlockingQueue實現并發,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
阻塞隊列(BlockingQueue)是一個支持兩種附加操作的隊列。支持附加阻塞的插入和移除操作。
支持阻塞的插入:當隊列滿時,插入操作會被阻塞,直到隊列不滿。
支持阻塞的移除:當隊列空時,移除操作會被阻塞,直到隊列不空。
阻塞隊列不可用時,操作處理方式
方法\處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除方法 | remove() | poll() | take() | poll(time, unit) |
檢查方法 | element() | peek() | 無 | 無 |
拋出異常:隊列滿時,若繼續插入元素會拋出IllegalStateException
;當隊列為空時,若獲取元素則會拋出NoSuchElementException
異常。
返回特殊值:向隊列插入元素時,會返回是否插入成功true/false;獲取元素時,成功則返回元素,失敗則返回null。
一直阻塞:當阻塞隊列滿時,若繼續使用put新增元素時會被阻塞,直到隊列不為空或者響應中斷退出;當阻塞隊列為空時,繼續使用take獲取元素時會被阻塞,直到隊列不為空。
超時退出:當阻塞隊列滿時,使用offer(e, time, unit)新增元素會被阻塞至超時退出;當隊列為空時,使用poll(time, unit)獲取元素時會被阻塞至超時退出。
注意:
阻塞隊列中不允許插入null
,會拋出NPE異常。
可以訪問阻塞隊列中的任意元素,調用remove(Object o)
可以將隊列之中的特定對象移除,但會遍歷全部元素,并不高效。
由數組構成的有界阻塞隊列,內部由數組final Object[] items
實現。默認情況下不保證線程公平的訪問隊列,所謂公平訪問隊列指阻塞的線程,可以按照阻塞的先后順序訪問隊列。
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); // 使用公平鎖/非公平鎖 notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
隊列大小初始化后不可修改。參數fair
控制內部ReentrantLock
是否采用公平鎖。
鏈表實現的有界阻塞隊列。內部結構是單鏈表。默認大小為Integer.MAX_VALUE
,可以指定大小。
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); // 指定隊列大小 this.capacity = capacity; last = head = new Node<E>(null); } // 單鏈表節點Node static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } }
支持優先級的無界阻塞隊列。默認情況下采取自然順序升序排列。也可以自定義compareTo()
方法來指定元素的排列順序,或者初始化隊列時,指定構造參數Comparator
來對元素進行排序。同優先級順序無法保證。
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); // 非公平鎖 this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; } // offer方法部分代碼 Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp);
由offer代碼可以看出,Comparator
的優先級是大于Comparable.compareTo
方法的。
注意:PriorityBlockingQueue
不會阻塞數據生產者(隊列無界),只會在沒有數據時阻塞消費者。生產者生產數據的速度絕對不能快于消費者消費數據的速度,否則將有可能耗盡堆空間。
支持延時獲取元素的無界隊列。隊列使用PriorityQueue
實現。隊列中的元素必須實現java.util.concurrent.Delayed
接口,在創建元素時指定多久才能才能從隊列中取到元素。
DelayQueue非常有用,可以將DelayQueu應用在以下應用場景。
緩存系統的設計:用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能獲取到元素時,表示緩存有限期到了。
定時任務調度:使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行。比如TimerQueue
就是使用DelayQueue實現的。
不存儲元素的阻塞隊列。每個put
操作都必須等待一個take
操作,反之亦然。
// fair為true,等待線程將以FIFO的順序進行訪問 public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
將生產者線程處理的數據直接傳遞給消費者線程。隊列本身不存儲任何元素,非常適合傳遞性場景。SynchronousQueue
的吞吐量高于ArrayBlockingQueue
和LinkedBlockingQueue
。
利用Lock
鎖的多條件(Condition)阻塞控制。下面簡單分析下ArrayBlockingQueue
部分代碼。
/** The queued items */ // 數據元素數組 final Object[] items; /** items index for next take, poll, peek or remove */ // 下一個待獲取元素索引 int takeIndex; /** items index for next put, offer, or add */ // 下一個待插入元素索引 int putIndex; /** Number of elements in the queue */ // 隊列中元素個數 int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ // 所有訪問的主鎖 final ReentrantLock lock; /** Condition for waiting takes */ // 消費者監視器 private final Condition notEmpty; /** Condition for waiting puts */ // 生產者監視器 private final Condition notFull; // public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
// 在隊列尾部插入元素,若隊列已滿則等待隊列非滿。 public void put(E e) throws InterruptedException { // 校驗插入元素,為空則拋出NPE checkNotNull(e); final ReentrantLock lock = this.lock; // 1. 嘗試獲取鎖(響應中斷) lock.lockInterruptibly(); try { // 2. 當隊列滿時 while (count == items.length) // 2.1 若隊列滿,則阻塞當前線程。等待`notFull.signal()`喚醒。 notFull.await(); // 3. 非滿則執行入隊操作 enqueue(e); } finally { lock.unlock(); } } // 在`putIndex`處放置當前元素,只有獲取lock鎖后才會調用 private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; // 在`putIndex`處放置元素 items[putIndex] = x; // putIndex等于數組長度時,重置為0索引。 if (++putIndex == items.length) putIndex = 0; // 數量加1 count++; // 4. 喚醒一個等待線程(等待取元素的線程) notEmpty.signal(); }
put總體流程:
獲取lock鎖,拿到鎖后繼續執行,否則自旋競爭鎖。
判斷阻塞隊列是否滿。滿了了則調用await
阻塞當前線程。同時釋放lock鎖。
如果沒滿,則調用enqueue
方法將元素put進阻塞隊列。此時還有一種可能是:第2步中被阻塞的線程被喚醒且又拿到了lock鎖。
喚醒一個標記為notEmpty(消費者)
的線程。
// 從頭部獲取元素,若隊列為空則等待隊列非空。 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 1. 獲取鎖 lock.lockInterruptibly(); try { // 2. 當隊列為空時 while (count == 0) // 2.1 當隊列為空時,阻塞當前線程。等待`notEmpty.signal()`喚醒。 notEmpty.await(); // 3. 非空則進行入隊操作 return dequeue(); } finally { lock.unlock(); } } // 從`takeIndex`位置獲取當前元素,只有獲取到lock鎖后才會調用 private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") // 從`takeIndex`位置獲取元素,然后清除該位置元素 E x = (E) items[takeIndex]; items[takeIndex] = null; // if (++takeIndex == items.length) takeIndex = 0; // 隊列元素減1 count--; if (itrs != null) itrs.elementDequeued(); // 4. 喚醒一個標記為notFull(生產者)的線程 notFull.signal(); return x; }
take的整體流程:
獲取lock鎖,拿到鎖則執行下一步流程;未拿到則自旋競爭鎖。
當前隊列是否為空,若為空則調用notEmpty.await
阻塞當前線程,同時釋放鎖,等待被喚醒。
若非空,則調用dequeue
進行出隊操作。此時還有一種可能:第2步中的阻塞的線程被喚醒并且又拿到了lock鎖。
喚醒一個被標記為notFull(生產者)的線程。
put
和take
操作都需要先獲得鎖,沒有獲得鎖的線程無法進行操作。
拿到鎖后,并不一定能順利執行put
/take
操作,還需要判斷隊列是否可用(是否滿/空),不可用則會被阻塞,并釋放鎖。
在2中被阻塞的線程會被喚醒,但喚醒之后依然需要拿到鎖之后才能繼續向下執行。否則,自旋拿鎖,拿到鎖后再while判斷隊列是否可用。
看完上述內容,你們掌握Java中怎么使用BlockingQueue實現并發的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。