您好,登錄后才能下訂單哦!
這篇文章主要講解了“JUC的ConcurrentLinkedQueue如何使用”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“JUC的ConcurrentLinkedQueue如何使用”吧!
ConcurrentLinkedQueue 是線程安全的無界非阻塞隊列。在 JUC 包中,線程安全的隊列按照實現方式可以分為阻塞隊列和非阻塞隊列兩大類,前者基于鎖來保證線程安全,而后者則基于 CAS 機制保證線程安全,阻塞隊列一般在類名中都帶有 Blocking 的字樣。
由 Linked 關鍵字我們可以推斷出 ConcurrentLinkedQueue 底層依賴于鏈表實現,在 ConcurrentLinkedQueue 的內部實現了一個單鏈表,用以存放隊列元素。其中,結點 Node 類定義如下:
private static class Node<E> { /** 結點元素值 */ volatile E item; /** 指針,指向下一個結點 */ volatile Node<E> next; // ... 省略方法定義 }
Node 類是一個典型的鏈表結點定義,此外,Node 類還定義了一些方法用于修改結點的元素值和 next 指針,這些方法均基于 Unsafe 實現。ConcurrentLinkedQueue 的 head 和 tail 字段分別指向隊列的頭結點和尾結點,如下:
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, Serializable { private transient volatile Node<E> head; private transient volatile Node<E> tail; public ConcurrentLinkedQueue() { head = tail = new Node<>(null); } // ... 省略方法實現 }
當我們構造一個空的 ConcurrentLinkedQueue 對象時,鏈表的 head 和 tail 均指向一個元素值為 null 的標記結點。在 ConcurrentLinkedQueue 中不允許添加 null 元素,因為值為 null 的結點在 ConcurrentLinkedQueue 中扮演著特殊的角色。
Queue 接口繼承自 Collection 接口,增加了隊列相關的操作,定義如下:
public interface Queue<E> extends Collection<E> { boolean add(E e); boolean offer(E e); E poll(); E peek(); E element(); E remove(); }
針對各方法的含義說明如下:
add
:往隊列中添加元素,如果成功則返回 true,對于有界隊列來說,如果隊列已滿則拋出 IllegalStateException 異常。
offer
:往隊列中添加元素,如果成功則返回 true,對于有界隊列來說,如果隊列已滿則返回 false,而不是拋出異常。
poll
:移除隊列頭結點,并返回結點元素值,如果隊列為空則返回 null。
peek
:僅獲取頭結點元素值而不刪除結點,如果隊列為空則返回 null。
element
:僅獲取頭結點元素值而不刪除結點,如果隊列為空則拋出 NoSuchElementException 異常。
remove
:移除隊列頭結點,并返回結點元素值,如果隊列為空則拋出 NoSuchElementException 異常。
ConcurrentLinkedQueue 實現自 Queue 接口,下面來分析一下其針對 Queue 中聲明的核心操作方法的實現。
ConcurrentLinkedQueue 實現了 Queue 接口中聲明的往隊列中添加元素方法,即 Queue#add
和 Queue#offer
。這兩個方法都是往隊列末端追加元素,因為 ConcurrentLinkedQueue 沒有容量上的限制,所以這兩個方法也就不存在隊列已滿的問題。所以,對于 ConcurrentLinkedQueue 而言,這兩個方法在實現上并沒有區別。
下面來看一下 ConcurrentLinkedQueue 針對 Queue#offer
的實現,如下:
public boolean offer(E e) { // 待添加元素不能為 null checkNotNull(e); // 創建待添加元素對應的 Node 結點 final Node<E> newNode = new Node<>(e); // 添加到尾結點 for (Node<E> t = tail, p = t; ; ) { Node<E> q = p.next; if (q == null) { // 1 // p 已經是尾結點,基于 CAS 設置結點 newNode 為 p 的 next 結點 if (p.casNext(null, newNode)) { /* * Successful CAS is the linearization point for e to become an element of this queue, * and for newNode to become "live". */ if (p != t) { // hop two nodes at a time // 更新 tail 結點 this.casTail(t, newNode); // Failure is OK. } return true; } // Lost CAS race to another thread; re-read next } else if (p == q) { // 2 /* * We have fallen off list. If tail is unchanged, it will also be off-list, * in which case we need to jump to head, from which all live nodes are always reachable. * Else the new tail is a better bet. */ p = (t != (t = tail)) ? t : head; } else { // 3 // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; } } }
對于待添加的元素,上述方法首先會判斷是否為 null,前面已經提及過 ConcurrentLinkedQueue 不允許向其中添加 null 元素,這主要是因為元素值為 null 的結點在 ConcurrentLinkedQueue 中是一個特殊的標記結點。如果待添加元素不為 null,則上述方法會將元素包裝成 Node 結點(令該結點為 N)添加到隊列的末端。
下面通過圖示演示各種不同的元素添加場景,本小節中均使用青色表示 head 結點,使用橘黃色表示 tail 結點。假設當前隊列的元素構成如下圖 (1) 所示,此時 q 結點為 null(說明:本文中所有虛線表示的結點均指代結點本身為 null,而非結點元素值為 null),即運行到上述代碼 1 位置。需要基于 CAS 操作將 p 的 next 結點由 null 修改為 N 結點,成功則返回 true。此時 p 不等于 t,操作完成之后隊列的元素構成如下圖 (2) 所示。
考慮上述過程存在多個線程競爭,假設現在有兩個線程 A 和 B,其中 A 在執行代碼 1 中的 Node#casNext
時成功將 p 的 next 結點由 null 更新為 node1 結點,如下圖 (1) 所示。此時線程 B 再執行 Node#casNext
企圖將 p 的 next 結點由 null 更新為 node2 結點時將失敗,因為 p 的 next 結點此時已經不為 null,所以線程 B 將進入下一輪 for 循環,但此時 q 已經不為 null,且不等于 p,所以進入代碼 3,這一步的運行結果就是將 q 賦值給 p,如下圖 (2) 所示。接著線程 B 繼續進入下一輪 for 循環,執行 Node<E> q = p.next;
,如下圖 (3) 所示。因為此時 q 等于 null,所以繼續執行代碼 1 將 p 的 next 結點由 null 修改為 node2,如下圖 (4) 所示。但此時的 p 不等于 t,所以需要執行 ConcurrentLinkedQueue#casTail
更新 tail 結點,如下圖 (5) 所示。
最后再來分析一下什么情況下會執行到代碼 2。假設當前隊列的元素構成如下圖 (1) 所示,此種結構一般由其它線程執行 poll 方法所造成,下一小節會進行分析。此時 tail 結點形成了自引用,開始執行 for 循環時 p 和 t 均指向 tail 結點,當將 p 的 next 結點賦值給 q 時,因為 p 的 next 結點即為 tail 結點自己,所以 q 也指向 tail 結點。此時,q 結點不為 null,且 p 等于 q,所以執行代碼 2 將 head 結點賦值給 p,如下圖 (2) 所示。所以這一步的目的在于跳出自引用,后續的執行流程參考下圖 (3)、(4) 和 (5),讀者可以自行梳理。
除了上面介紹的 ConcurrentLinkedQueue#offer
方法,ConcurrentLinkedQueue 還實現了 ConcurrentLinkedQueue#add
方法同樣用于往隊列末端追加元素,不過因為 ConcurrentLinkedQueue 是無界隊列,所以該方法也只是簡單的將請求委托給 ConcurrentLinkedQueue#offer
方法執行。
針對獲取元素的操作,Queue 接口聲明了 3 個方法,包括 Queue#poll
、Queue#peek
,以及 Queue#element
。其中 Queue#poll
和 Queue#peek
的區別一般都比較熟悉,而 Queue#element
方法在功能上與 Queue#peek
方法類似,都是獲取隊列的頭結點元素值而不刪除結點,區別在于當隊列為空時,方法 Queue#peek
返回 null,而 Queue#element
則拋出異常。ConcurrentLinkedQueue 針對 Queue#element
方法的實現實際上也是委托給 ConcurrentLinkedQueue#peek
方法執行的,只是對該方法的返回值進行了處理,如果返回值為 null 則拋出 NoSuchElementException 異常。
下面首先來看一下 ConcurrentLinkedQueue 針對方法 Queue#poll
的實現,如下:
public E poll() { restartFromHead: for (; ; ) { // 從頭結點獲取元素 for (Node<E> h = head, p = h, q; ; ) { E item = p.item; // 如果當前頭結點不為 null,則嘗試基于 CAS 將其修改為 null if (item != null && p.casItem(item, null)) { // 1 // CAS 操作成功 // Successful CAS is the linearization point for item to be removed from this queue. if (p != h) { // hop two nodes at a time this.updateHead(h, ((q = p.next) != null) ? q : p); } return item; } // 當前隊列為空 else if ((q = p.next) == null) { // 2 this.updateHead(h, p); return null; } else if (p == q) { // 3 continue restartFromHead; } else { // 4 p = q; } } } } final void updateHead(Node<E> h, Node<E> p) { // 將頭結點由 h 更新為 p if (h != p && this.casHead(h, p)) { // 更新 h 的 next 結點為 h 自己 h.lazySetNext(h); } }
上述方法使用了 continue 標簽語法以控制代碼的執行邏輯,其中標簽名為 restartFromHead,此外,break 關鍵字同樣支持標簽語法,與 continue 一起實現類似 goto 關鍵字的功能。
下面同樣通過圖示演示各種不同的獲取元素場景,本小節中均使用青色表示 head 結點,使用橘黃色表示 tail 結點。假設當前隊列的元素構成如下圖 (1) 所示,此時 p 和 h 均指向 head 結點,而 q 結點未賦值,所以暫未標出。此時 p 所指向的結點元素值不為 null,所以嘗試執行 Node#casItem
方法基于 CAS 修改結點元素值為 null,即運行上述代碼 1。假設當前線程 CAS 操作成功,如下圖 (2) 所示,因為此時 p 等于 h,所以直接返回結點元素值,即出隊列成功。
繼續演示一些其它情況,假設現在隊列的頭結點元素值為 null,所以直接跳轉到代碼 2 執行,將 q 賦值為 p 的 next 結點,如下圖 (1) 所示,但是因為結點不為 null,所以繼續往下執行。此時 p 不等于 q,所以執行代碼 4 將 q 賦值給 p,如下圖 (2) 所示,然后進入下一輪循環。
此時結點 p 的元素值不為 null,所以進入代碼 1。考慮存在多個線程競爭的場景,假設現在有兩個線程 A 和 B,其中 A 在執行代碼 1 中的 Node#casItem
時成功將 p 的元素值更新為 null,如下圖 (3-1) 所示。因為此時 p 不等于 h,所以執行 ConcurrentLinkedQueue#updateHead
方法將頭結點由 h 更新為 p,并重定向 h 的 next 指針指向自己,如下圖 (4-1) 所示。最后返回結點元素值,即出隊列成功。
因為線程 A 已經操作成功,所以線程 B 在執行 Node#casItem
方法時必然失敗,于是繼續向下執行代碼 2,將 q 指向 p 的 next 結點,如上圖 (3-2) 所示。因為此時 q 結點為 null,所以執行 ConcurrentLinkedQueue#updateHead
方法將頭結點由 h 更新為 p,并重定向 h 的 next 指針指向自己,如上圖 (4-2) 所示。最后返回 null 值,即出隊列成功。
最后再來分析一下什么情況下會執行到代碼 3。假設當前隊列的元素構成如下圖 (1) 所示,并且當前有兩個線程(A 和 B)競爭執行出隊列操作,線程 A 首先執行代碼 1 基于 CAS 將結點 p 元素值修改為 null,如下圖 (2) 所示。但在線程 A 繼續執行代碼 1 中的 ConcurrentLinkedQueue#updateHead
方法嘗試更新 head 結點之前,B 線程進入了 for 循環,如下圖 (3) 所示,此時 B 線程的 h 和 p 指針均指向 head 結點,但是在 B 繼續向下執行之前,A 線程執行了 ConcurrentLinkedQueue#updateHead
方法,將 head 結點由 h 更新為 p,并修改 h 的 next 指針指向自己,最后返回元素值,如下圖 (4) 所示。
此時,如上圖 (5) 所示,B 線程再繼續執行時發現 p 結點元素值為 null,所以跳轉執行代碼 2 將 p 的 next 結點賦值給 q,如上圖 (6) 所示。因為此時 p 結點不為 null,且是自引用,所以 p 也就等于 q,繼續執行代碼 3 跳出本次 for 循環從頭再來。再次進入 for 循環時,B 線程看到的隊列結構就變成了如上圖 (7) 所示。
本小節的最后一起來看一下 ConcurrentLinkedQueue#peek
方法的實現,相對于 ConcurrentLinkedQueue#poll
方法,該方法的區別在于僅獲取隊頭元素,而不移除頭結點。方法實現如下:
public E peek() { restartFromHead: for (; ; ) { for (Node<E> h = head, p = h, q; ; ) { E item = p.item; if (item != null || (q = p.next) == null) { // 1 this.updateHead(h, p); return item; } else if (p == q) { // 2 continue restartFromHead; } else { // 3 p = q; } } } }
上述實現初看起來似乎不太好理解,但是如果像上面一樣結合圖示去分析就會一目了然,這里就不再繼續演示各個步驟的執行邏輯,感興趣的讀者可以自己動手畫一下。
針對刪除元素操作,Queue 僅聲明了 Queue#remove
方法,用于刪除隊列頭結點并返回結點元素值,區別于 Queue#poll
方法返回 null 值,當隊列為空時該方法將拋出異常。ConcurrentLinkedQueue 還實現了帶參數的 remove 方法(繼承自 Collection 接口),該方法用于從當前隊列中刪除目標元素值對應的結點,如果存在多個則刪除第 1 個。下面主要來看一下帶參數版本的 remove 方法實現,如下:
public boolean remove(Object o) { if (o != null) { Node<E> next, pred = null; // 遍歷隊列中的元素 for (Node<E> p = this.first(); p != null; pred = p, p = next) { boolean removed = false; E item = p.item; if (item != null) { // 如果當前遍歷元素不是期望刪除的元素,則繼續獲取后繼結點 if (!o.equals(item)) { next = this.succ(p); continue; } // 當前遍歷元素是期望刪除的元素,基于 CAS 將該結點置為 null removed = p.casItem(item, null); } /* * 指向到這里分為兩種情況: * 1. 當前結點為 null。 * 2. 當前結點為待刪除結點。 */ // 獲取當前結點的后繼結點 next = this.succ(p); // 更新前驅結點的 next 指針指向當前結點的后繼結點 if (pred != null && next != null) { // unlink pred.casNext(p, next); } if (removed) { return true; } } } return false; }
刪除的過程從隊列頭部開始遍歷,并在遇到待刪除元素時基于 CAS 將對應結點元素值更新為 null,在遍歷過程中會剔除掉所有元素值為 null 的結點。
ConcurrentLinkedQueue 提供了 ConcurrentLinkedQueue#size
方法用于獲取隊列的長度,該方法在實現上會從頭開始對隊列進行遍歷,并計數元素值不為 null 的結點,并以 Integer.MAX_VALUE
作為計數值上界。需要注意的一點是不同于一般的集合,ConcurrentLinkedQueue 整個計算隊列大小的過程時間復雜度為 O(n)
,并且結果是不準確的。如果期間有其它線程對隊列執行增刪操作,將不會在 ConcurrentLinkedQueue#size
方法的返回值中體現。
方法 ConcurrentLinkedQueue#contains
用于判斷隊列中是否包含參數指定的元素值,在實現上與 ConcurrentLinkedQueue#size
方法思想想通,都需要從頭開始遍歷隊列并對元素值進行比對。
感謝各位的閱讀,以上就是“JUC的ConcurrentLinkedQueue如何使用”的內容了,經過本文的學習后,相信大家對JUC的ConcurrentLinkedQueue如何使用這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。