您好,登錄后才能下訂單哦!
本篇內容介紹了“JUC的LinkedBlockingQueue如何實現”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
由 Blocking 字樣可以推斷出 LinkedBlockingQueue 是阻塞隊列,前面我們介紹過阻塞隊列和非阻塞隊列在實現上的區別,知道阻塞隊列一般是基于鎖機制來保證線程安全,本文我們就一起來分析一下 LinkedBlockingQueue 是如何基于鎖構建線程安全隊列的。
同樣由 Linked 關鍵字我們可以推斷出 LinkedBlockingQueue 底層依賴于鏈表實現,在 LinkedBlockingQueue 的內部實現了一個單鏈表,用以存放隊列元素。其中,結點 Node 類定義如下:
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } }
其中 next 指針的指向分為 3 種情況:
指向某個具體的后繼結點。
指向自己,意味著后繼結點為 head.next
。
指向 null,說明當前結點是隊列的尾結點,沒有后繼結點。
LinkedBlockingQueue 定義了 head 和 last 指針分別指向隊列的頭結點和尾結點。此外,LinkedBlockingQueue 還定義了如下字段:
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable { /** 當前隊列的容量上限 */ private final int capacity; /** 記錄當前隊列的元素個數 */ private final AtomicInteger count = new AtomicInteger(); /** 隊列頭結點 */ transient Node<E> head; /** 隊列尾結點 */ private transient Node<E> last; /** 用于控制 take、poll 等操作,保證同一時間只有一個線程從隊列獲取元素 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 條件隊列,記錄出隊列時因為隊列為空而等待的線程 */ private final Condition notEmpty = takeLock.newCondition(); /** 用戶控制 put、offer 等操作,保證同一時間只有一個線程往隊列添加元素 */ private final ReentrantLock putLock = new ReentrantLock(); /** 條件隊列,記錄入隊列時因為隊列已滿而等待的線程 */ private final Condition notFull = putLock.newCondition(); // ... 省略方法定義 }
由上述字段定義可以看出,LinkedBlockingQueue 限制了隊列的容量上限,并使用 AtomicInteger 類型字段對隊列中的元素個數進行計數。雖然 LinkedBlockingQueue 底層依賴于鏈表實現,理論上是無界的,但是 LinkedBlockingQueue 在實現上卻限制了隊列的容量上限(默認為 Integer.MAX_VALUE
)。
此外,針對出隊列和入隊列操作,LinkedBlockingQueue 分別設置了一把獨占可重入鎖,即 takeLock 和 putLock,從而保證同一時間只有一個線程執行出隊列操作,只有一個線程執行入隊列操作,且出隊列的線程與入隊列的線程彼此之間不相互影響。針對一些阻塞版本的出隊列入隊列方法,如果隊列為空,則出隊列線程會被記錄到條件隊列 notEmpty 中進行等待,如果隊列已滿,則入隊列線程會被記錄到條件隊列 notFull 中進行等待。
BlockingQueue 接口繼承自 Queue 接口,用于描述阻塞隊列。當隊列無法及時響應用戶請求時,例如當我們嘗試從空隊列中獲取元素,或者繼續往已滿的有界隊列中添加元素,BlockingQueue 定義了以下 4 種響應形式:
拋出異常。
立即返回特殊值,例如 null 或 false。
無限期阻塞當前請求,直到隊列狀態變為可用。
超時阻塞當前請求,直到隊列狀態變為可用。
BlockingQueue 接口的定義如下:
public interface BlockingQueue<E> extends Queue<E> { boolean offer(E e); boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; boolean add(E e); void put(E e) throws InterruptedException; E poll(long timeout, TimeUnit unit) throws InterruptedException; E take() throws InterruptedException; boolean remove(Object o); boolean contains(Object o); int remainingCapacity(); int drainTo(Collection<? super E> c); int drainTo(Collection<? super E> c, int maxElements); }
針對各方法的含義說明如下:
offer
:往隊列中添加元素,如果成功則返回 true,對于有界隊列來說,如果隊列已滿則返回 false,而不是拋出異常。BlockingQueue 同時還聲明了超時版本的 offer 方法。
add
:往隊列中添加元素,如果成功則返回 true,對于有界隊列來說,如果隊列已滿則拋出 IllegalStateException 異常。
put
:往隊列中添加元素,對于有界隊列來說,如果隊列已滿則阻塞當前請求,期間支持響應中斷。
poll
:移除隊列頭結點,并返回結點元素值,如果隊列為空則等待指定時間,并在超時時返回 null,期間支持響應中斷。
take
:僅獲取頭結點元素值而不刪除結點,如果隊列為空則阻塞等待,期間支持響應中斷。
remove
:接收一個參數,從隊列中刪除值等于該參數的結點,如果存在多個結點滿足要求,則刪除第一個。
contains
:接收一個參數,判斷隊列中是否存在值等于該參數的結點。
remainingCapacity
:返回隊列的剩余容量,如果是無界隊列,則返回 Integer.MAX_VALUE
。
drainTo
:從隊列中移除所有(或指定個數)結點,并將結點元素放入參數指定的集合中返回,相對于逐個移除更加高效。
LinkedBlockingQueue 實現自 BlockingQueue 接口,下面針對核心方法的實現逐一進行分析。
針對添加元素的操作,LinkedBlockingQueue 實現了 LinkedBlockingQueue#offer
、LinkedBlockingQueue#add
和 LinkedBlockingQueue#put
方法,其中 LinkedBlockingQueue#add
是對 LinkedBlockingQueue#offer
的封裝,并在隊列已滿時拋出 IllegalStateException 異常。
下面主要展開分析 LinkedBlockingQueue#offer
和 LinkedBlockingQueue#put
方法的實現。首先來看一下 LinkedBlockingQueue#offer
方法,實現如下:
public boolean offer(E e) { // 不允許添加 null 元素 if (e == null) { throw new NullPointerException(); } final AtomicInteger count = this.count; // 當前隊列已滿,直接返回 false if (count.get() == capacity) { return false; } int c = -1; // 創建待添加元素對應的結點對象 Node<E> node = new Node<>(e); final ReentrantLock putLock = this.putLock; // 加鎖 putLock.lock(); try { // 再次校驗隊列是否已滿 if (count.get() < capacity) { // 往隊列末端追加結點 this.enqueue(node); // 隊列元素個數計數加 1,并返回添加之前隊列的大小 c = count.getAndIncrement(); // 當前隊列在執行添加操作之后仍然存在空閑位置,嘗試喚醒一個之前因為隊列已滿而等待的線程 if (c + 1 < capacity) { notFull.signal(); } } } finally { // 釋放鎖 putLock.unlock(); } // c == 0 說明隊列中至少存在一個元素(當前添加的),嘗試喚醒一個之前因為隊列為空而等待的線程 if (c == 0) { this.signalNotEmpty(); } return c >= 0; }
與 ConcurrentLinkedQueue 一樣,LinkedBlockingQueue 同樣不允許往其中添加 null 元素。如果隊列已滿,則上述方法會直接返回 false,表示添加失敗,否則創建待添加元素對應的結點對象,并繼續執行:
加鎖,保證同一時間只有一個線程在執行添加操作;
再次校驗隊列是否已滿,如果已滿則跳轉至步驟 5,否則執行 LinkedBlockingQueue#enqueue
方法往隊列末端插入結點;
結點個數計數加 1;
如果在完成本次添加操作之后,隊列仍然未滿,則嘗試喚醒一個之前因為隊列已滿而等待的線程;
釋放鎖;
如果本次成功添加了一個元素,則調用 LinkedBlockingQueue#signalNotEmpty
方法嘗試喚醒一個之前因為隊列為空而等待的線程;
返回。
其中 LinkedBlockingQueue#signalNotEmpty
方法的實現比較簡單,讀者可以參考源碼實現。這里簡單提一下 LinkedBlockingQueue#enqueue
方法,實現如下:
private void enqueue(Node<E> node) { last = last.next = node; }
在 LinkedBlockingQueue 對象被構造出來時,head 和 last 指針均指向一個元素值為 null 的標記結點。由上述方法的實現可以看出當執行入隊列操作時,是將結點賦值給 last 結點的 next 指針,并沒有移除隊列頭部的 null 結點,下文在介紹出隊列操作時返回的都是 head.next
結點元素值,理解了上述插入操作的執行過程也就不會疑惑為什么出隊列時不是直接返回 head 結點的元素值。
LinkedBlockingQueue 還定義了超時版本的 LinkedBlockingQueue#offer(E, long, TimeUnit)
方法,當隊列已滿時,該方法會阻塞等待指定的時間。
下面再來看一下 LinkedBlockingQueue#put
方法,相對于上面介紹的 LinkedBlockingQueue#offer
方法,對于有界隊列而言,如果隊列已滿則該方法將無限期阻塞,方法實現如下:
public void put(E e) throws InterruptedException { // 不允許添加 null 元素 if (e == null) { throw new NullPointerException(); } // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 加鎖,期間支持響應中斷 putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is not protected by lock. * This works because count can only decrease at this point (all other puts are shut out by lock), * and we (or some other waiting put) are signalled if it ever changes from capacity. * Similarly for all other uses of count in other wait guards. */ // 隊列已滿,則等待 while (count.get() == capacity) { notFull.await(); } // 執行入隊列操作 this.enqueue(node); // 隊列元素個數計數加 1,并返回添加之前隊列的大小 c = count.getAndIncrement(); // 當前隊列在執行添加操作之后仍然存在空閑位置,嘗試喚醒一個之前因為隊列已滿而等待的線程 if (c + 1 < capacity) { notFull.signal(); } } finally { // 釋放鎖 putLock.unlock(); } // c == 0 說明隊列中至少存在一個元素(當前添加的),嘗試喚醒一個之前因為隊列為空而等待的線程 if (c == 0) { this.signalNotEmpty(); } }
由上述實現可以看出,相對于 LinkedBlockingQueue#offer
方法在隊列已滿時的直接返回,方法 LinkedBlockingQueue#put
會將當前線程添加到條件隊列中等待其它線程釋放隊列空間。
針對獲取元素的操作,LinkedBlockingQueue 實現了 LinkedBlockingQueue#poll
、LinkedBlockingQueue#peek
和 LinkedBlockingQueue#take
方法,其中 LinkedBlockingQueue#peek
方法僅獲取隊列頭結點元素值,而不移除頭結點,實現上比較簡單。下面展開分析 LinkedBlockingQueue#poll
和 LinkedBlockingQueue#take
方法的實現機制。
首先來看一下 LinkedBlockingQueue#poll
方法,LinkedBlockingQueue 針對該方法定義了兩個版本,區別在于當隊列為空時是立即返回還是阻塞等待一段時間,而在實現思路上是一致的。這里以不帶超時參數的版本為例展開分析,實現如下:
public E poll() { final AtomicInteger count = this.count; // 當前隊列為空,直接返回 null if (count.get() == 0) { return null; } E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; // 加鎖 takeLock.lock(); try { // 如果當前隊列不為空 if (count.get() > 0) { // 獲取隊列頭結點元素,并移除頭結點 x = this.dequeue(); // 隊列元素計數值減 1,這里返回的是減 1 之前的值 c = count.getAndDecrement(); // 隊列在執行移除操作后至少還存在一個元素,嘗試喚醒一個之前因為隊列為空而阻塞的線程 if (c > 1) { notEmpty.signal(); } } } finally { // 釋放鎖 takeLock.unlock(); } /* * 之前隊列已滿,但是經過本次 poll 操作之后,至少有一個空閑位置, * 嘗試喚醒一個之前因為隊列已滿而阻塞的線程 */ if (c == capacity) { this.signalNotFull(); } return x; }
如果隊列為空則上述方法會直接返回 null,否則繼續執行:
加鎖,保證同一時間只有一個線程在執行獲取操作;
再次校驗隊列是否為空,如果為空則跳轉至步驟 5,否則執行 LinkedBlockingQueue#dequeue
方法移除隊列頭結點,并返回結點元素值;
結點個數計數減 1;
如果在完成本次移除操作之后,隊列仍然非空,則嘗試喚醒一個之前因為隊列為空而等待的線程;
釋放鎖;
如果本次成功移除了一個元素,則調用 LinkedBlockingQueue#signalNotFull
方法嘗試喚醒一個之前因為隊列已滿而等待的線程;
返回。
其中 LinkedBlockingQueue#signalNotFull
方法的實現比較簡單,讀者可以參考源碼實現。前面我們分析了入隊列 LinkedBlockingQueue#enqueue
方法,下面來看一下出隊列方法,實現如下:
private E dequeue() { Node<E> h = head; Node<E> first = h.next; // 自引用,等待 GC 回收 h.next = h; // help GC head = first; // 獲取真正隊列頭結點的元素值 E x = first.item; // 將隊列頭結點元素值置為 null first.item = null; return x; }
理解了前面入隊列的過程,則上述出隊列的實現也就一目了然,只要清楚隊列的頭結點一直是一個值為 null 的結點,而真正有效的隊列頭結點是該結點的 next 結點。
LinkedBlockingQueue 還定義了超時版本的 LinkedBlockingQueue#poll(long, TimeUnit)
方法,當隊列為空時,該方法會阻塞等待指定的時間。
下面再來看一下 LinkedBlockingQueue#take
方法,相對于上面介紹的 LinkedBlockingQueue#poll
方法,對于有界隊列而言,如果隊列為空則該方法將無限期阻塞,方法實現如下:
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 加鎖 takeLock.lockInterruptibly(); try { // 如果隊列為空,則等待 while (count.get() == 0) { notEmpty.await(); } // 獲取隊列頭結點元素,并移除頭結點 x = this.dequeue(); // 隊列元素計數值減 1,這里返回的是減 1 之前的值 c = count.getAndDecrement(); // 隊列在執行移除操作后至少還存在一個元素,嘗試喚醒一個之前因為隊列為空而阻塞的線程 if (c > 1) { notEmpty.signal(); } } finally { // 釋放鎖 takeLock.unlock(); } /* * 之前隊列已滿,但是經過本次 poll 操作之后,至少有一個空閑位置, * 嘗試喚醒一個之前因為隊列已滿而阻塞的線程 */ if (c == capacity) { this.signalNotFull(); } return x; }
由上述實現可以看出,相對于 LinkedBlockingQueue#poll
方法在隊列為空時的直接返回,方法 LinkedBlockingQueue#take
會將當前線程添加到條件隊列中等待其它線程添加新的隊列元素。
針對移除元素的操作,LinkedBlockingQueue 實現了 LinkedBlockingQueue#remove
方法,并提供了有參和無參的版本,其中無參版本實際上是委托給 LinkedBlockingQueue#poll
方法執行的。下面來分析一下有參版本的實現,如下:
public boolean remove(Object o) { if (o == null) { return false; } // 鎖定出隊列、入隊列操作 this.fullyLock(); try { // 從頭開始遍歷隊列 for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { // 如果找到第一個目標元素,則移除 if (o.equals(p.item)) { // 移除 p 結點,如果執行移除之后隊列有空閑位置, // 則嘗試喚醒一個之前因為隊列已滿而阻塞的線程 this.unlink(p, trail); return true; } } return false; } finally { // 釋放出隊列、入隊列操作 this.fullyUnlock(); } }
上述方法接收一個參數,并執行刪除元素值等于該參數的結點,如果存在多個滿足條件的結點,則刪除第一個。在執行刪除操作之前會獲取 putLock 和 takeLock 兩把鎖,以防止刪除期間有其它線程執行出隊列或入隊列操作。
最后來看一下 LinkedBlockingQueue#contains
和 LinkedBlockingQueue#size
方法的實現,前者用于檢查隊列是否包含值等于參數的結點,實現如下:
public boolean contains(Object o) { if (o == null) { return false; } // 鎖定出隊列、入隊列操作 this.fullyLock(); try { // 從頭開始遍歷鏈表,并逐一比對 for (Node<E> p = head.next; p != null; p = p.next) { if (o.equals(p.item)) { return true; } } return false; } finally { // 釋放出隊列、入隊列操作 this.fullyUnlock(); } }
方法 LinkedBlockingQueue#size
用于返回隊列的結點個數,前面已經介紹了 LinkedBlockingQueue 定義了一個 AtomicInteger 類型的字段用于計數隊列的結點個數,所以 LinkedBlockingQueue#size
方法能夠精確的返回,且幾乎沒有性能開銷,同時在實現上非常簡單,如下:
public int size() { return count.get(); }
“JUC的LinkedBlockingQueue如何實現”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。