您好,登錄后才能下訂單哦!
[TOC]
ConcurrentlinkedQueue 還是一個基于鏈表的,×××的,線程安全的單端隊列,它采用先進先出(FIFO)的規則對節點進行排序,當我們加入一個元素時,它會插入隊列的尾部,當我們獲取元素時,會從隊列的首部獲取元素。它沒有使用鎖來保證線程安全,使用的是“wait-free”算法來保證整個隊列的線程安全。
// 存儲的數據
volatile E item;
// 下一個節點引用
volatile Node<E> next;
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
// 構造一個node節點
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
// 修改節點的item
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// 懶修改節點的next
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
// cas修改節點的next節點
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
構造節點是其實就是構造了一個node的item為null的節點,然后head和tail指向這個節點,如下圖所示:
public boolean add(E e) {
return offer(e);
}
我們可以看出其實調用的是offer方法,具體參考offer方法的講解。
源碼解析:
public boolean offer(E e) {
// 入隊元素不能為null
checkNotNull(e);
// 創建新的節點
final Node<E> newNode = new Node<E>(e);
// 死循環,設置節點
// p獲取尾節點
for (Node<E> t = tail, p = t;;) {
// q是p的next節點
Node<E> q = p.next;
// 獲取尾節點的next節點
// 尾節點沒有下一個節點
if (q == null) {
// p is last node
// 這一步說明p是尾節點,新的節點設置為尾節點的next節點
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
// 設置尾節點,當之前的尾節點和現在插入的節點之間有一個節點時
// 并不是每一次都cas設置尾節點(優化手段,是怎么想到這種優化的??)
if (p != t) // hop two nodes at a time
// cas設置尾節點,可能會失敗
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
// 多線程操作時候,由于poll時候會把舊的head變為自引用,然后將head的next設置為新的head
// 所以這里需要重新找新的head
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// 尋找尾節點
// Check for tail updates after two hops.
// p!=t
p = (p != t && t != (t = tail)) ? t : q;
}
}
分析插入過程,我們插入使用3個線程來調用offer 方法,ThreadA,ThreadB同時運行,ThreadC最后插入,分析下offer方法的流程。
第一步,隊列屬于初始化狀態,ThreadA,ThreadB同時調用offer方法;創建節點,死循環設置節點,獲取尾節點的next節點,此時q== null,兩個線程都同時可能看見,然后cas設置尾節點的next節點(隊列狀態如圖A所示),我們假設是ThreadA線程cas設置成功了,然后p==t此時的尾節點其實沒有發生變化;此時我們來看ThreadB由于A成功了,所以ThreadB cas失敗了,重新循環,此時q != null了,p == q顯然不等于,再看下一個else判斷p!=t,此時顯然p == t,所以才是p = q,然后再次循環,此時的q==null,我們假設沒有線程來和ThreadB競爭,所以cas設置成功,然后p!=t嗎,顯然滿足所以設置尾節點,此時的設置尾節點的節點和之前的尾節點之間剛剛好有一個節點(如圖B所示)。
第二步,ThreadC插入,此時的尾節點是ThreadB插入的節點假設是B,獲取B的next節點,q == null,然后cas設置節點,完成,p==t,所以不用更新尾節點(如圖C所示)。
注意:不會刪除元素,要和poll方法區別
public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
// 更新頭結點
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
public E poll() {
restartFromHead:
// 循環
for (;;) {
// 獲取頭結點
for (Node<E> h = head, p = h, q;;) {
// 獲取節點的內容
E item = p.item;
// item不為null ,使用cas設置item為空
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
// 更新頭結點,和尾節點一樣,不是每次都更新
// 頭結點item為null是,下個節點就必須更新頭結點
// 頭結點item不為null時,規則和更新尾節點一樣
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 那么獲取p節點的下一個節點,如果p節點的下一節點為null,則表明隊列已經空了
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// p == q,說明別的線程調用了updateHead,
// 自己的next 指向了自己,重新循環,獲取最新的頭結點
else if (p == q)
continue restartFromHead;
// 如果下一個元素不為空,則將頭節點的下一個節點設置成頭節點
else
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
分析我們按照offer時候元素來執行poll方法,ThreadD和ThreadE同時執行來分析下隊列的變化(主要分析p==q的產生)。
初始狀態(如圖C所示)
第一步,ThreadD和ThreadE執行poll操作,item等于null,所以執行執行下面的操作(q = p.next) == null不等于,p == q不等于,所以p = q,其實就是上圖的ThreadA插入的節點,此時的item已經不為null了,所以執行cas設置item為null的操作,假設ThreadD執行成功了,那么此時p!=h就滿足了,所以此時要更新頭結點調用updateHead,這個方法會更新頭結點,并且把原來的頭節點的next設置為自己)(如圖D所示);接下我們分析ThreadE,cas失敗了需要重新執行,此時的item已經不為null,所以執行執行下面的操作(q = p.next) == null不等于,p == q這使其實已經是等于了,因為ThreadD改變了了以前頭結點的next節點為自己,所以需要重新遍歷,獲取最新的頭結點,此時的頭結點其實就是ThreadA插入的節點,然后item為null,接著執行下面的判斷,最終p就是p.next節點也就是ThreadB節點,然后cas設置item為null,由于p=p.next,所以p發生了變化,所以需要設置ThreadB為頭結點(如圖E所示)。
看到上面的執行流程可能就有人有疑問了,這不是每次都更新頭結點嗎,沒有優化啊,只看poll方法確實是這樣,那什么時候會產生不是每次都更新頭節點了,那就是當頭節點的item不為null的時候,但是如果按初始化的狀況來看,頭結點的item一直是null,但是當我看了peek方法之后才發現,peek可以改變這個情況,可以設置item不為null的頭結點,其實我們可以在poll方法前調用下peek方法,其實就啟動了優化策略,不是每次更新頭結點,不知道作者是不是這么設計的,反正就是牛皮。
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
我們可以發現size沒有加鎖,就是遍歷了整個隊列,但是遍歷的同時可能在發生poll或者offer,所以size不是特別的精確,用的時候要注意。
ConcurrentLinkedQueue是×××的隊列,所以使用時一定要注意內存溢出的問題,還有在執行size方法時一定要注意這個是不準確的值;在學poll和offer方法時,一定要理解更新head和tail節點的時機,這種優化手段值得我們去學習,我覺得這就是學習源碼的作用,就是學習作者的源碼思想。
參考《Java 并發編程的藝術》
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。