您好,登錄后才能下訂單哦!
ConcurrentLinkedQueue介紹
ConcurrentLinkedQueue是線程安全的隊列,它適用于“高并發”的場景。
它是一個基于鏈接節點的無界線程安全隊列,按照 FIFO(先進先出)原則對元素進行排序。隊列元素中不可以放置null元素(內部實現的特殊節點除外)。
ConcurrentLinkedQueue原理和數據結構
ConcurrentLinkedQueue的數據結構,如下圖所示:
說明:
1. ConcurrentLinkedQueue繼承于AbstractQueue。
2. ConcurrentLinkedQueue內部是通過鏈表來實現的。它同時包含鏈表的頭節點head和尾節點tail。ConcurrentLinkedQueue按照 FIFO(先進先出)原則對元素進行排序。元素都是從尾部插入到鏈表,從頭部開始返回。
3. ConcurrentLinkedQueue的鏈表Node中的next的類型是volatile,而且鏈表數據item的類型也是volatile。關于volatile,我們知道它的語義包含:“即對一個volatile變量的讀,總是能看到(任意線程)對這個volatile變量最后的寫入”。ConcurrentLinkedQueue就是通過volatile來實現多線程對競爭資源的互斥訪問的。
ConcurrentLinkedQueue函數列表
// 創建一個最初為空的 ConcurrentLinkedQueue。 ConcurrentLinkedQueue() // 創建一個最初包含給定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍歷順序來添加元素。 ConcurrentLinkedQueue(Collection<? extends E> c) // 將指定元素插入此隊列的尾部。 boolean add(E e) // 如果此隊列包含指定元素,則返回 true。 boolean contains(Object o) // 如果此隊列不包含任何元素,則返回 true。 boolean isEmpty() // 返回在此隊列元素上以恰當順序進行迭代的迭代器。 Iterator<E> iterator() // 將指定元素插入此隊列的尾部。 boolean offer(E e) // 獲取但不移除此隊列的頭;如果此隊列為空,則返回 null。 E peek() // 獲取并移除此隊列的頭,如果此隊列為空,則返回 null。 E poll() // 從隊列中移除指定元素的單個實例(如果存在)。 boolean remove(Object o) // 返回此隊列中的元素數量。 int size() // 返回以恰當順序包含此隊列所有元素的數組。 Object[] toArray() // 返回以恰當順序包含此隊列所有元素的數組;返回數組的運行時類型是指定數組的運行時類型。 <T> T[] toArray(T[] a)
下面從ConcurrentLinkedQueue的創建,添加,刪除這幾個方面對它進行分析。
1 創建
下面以ConcurrentLinkedQueue()來進行說明。
public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); }
說明:在構造函數中,新建了一個“內容為null的節點”,并設置表頭head和表尾tail的值為新節點。
head和tail的定義如下:
private transient volatile Node<E> head; private transient volatile Node<E> tail;
head和tail都是volatile類型,他們具有volatile賦予的含義:“即對一個volatile變量的讀,總是能看到(任意線程)對這個volatile變量最后的寫入”。
Node的聲明如下:
private static class Node<E> { volatile E item; volatile Node<E> next; Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
說明:
Node是個單向鏈表節點,next用于指向下一個Node,item用于存儲數據。Node中操作節點數據的API,都是通過Unsafe機制的CAS函數實現的;例如casNext()是通過CAS函數“比較并設置節點的下一個節點”。
2. 添加
下面以add(E e)為例對ConcurrentLinkedQueue中的添加進行說明。
public boolean add(E e) { return offer(e); }
說明:add()實際上是調用的offer()來完成添加操作的。
offer()的源碼如下:
public boolean offer(E e) { // 檢查e是不是null,是的話拋出NullPointerException異常。 checkNotNull(e); // 創建新的節點 final Node<E> newNode = new Node<E>(e); // 將“新的節點”添加到鏈表的末尾。 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; // 情況1:q為空 if (q == null) { // CAS操作:如果“p的下一個節點為null”(即p為尾節點),則設置p的下一個節點為newNode。 // 如果該CAS操作成功的話,則比較“p和t”(若p不等于t,則設置newNode為新的尾節點),然后返回true。 // 如果該CAS操作失敗,這意味著“其它線程對尾節點進行了修改”,則重新循環。 if (p.casNext(null, newNode)) { if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } } // 情況2:p和q相等 else if (p == q) p = (t != (t = tail)) ? t : head; // 情況3:其它 else p = (p != t && t != (t = tail)) ? t : q; } }
說明:offer(E e)的作用就是將元素e添加到鏈表的末尾。offer()比較的地方是理解for循環,下面區分3種情況對for進行分析。
情況1 -- q為空。這意味著q是尾節點的下一個節點。此時,通過p.casNext(null, newNode)將“p的下一個節點設為newNode”,若設置成功的話,則比較“p和t”(若p不等于t,則設置newNode為新的尾節點),然后返回true。否則的話(意味著“其它線程對尾節點進行了修改”),什么也不做,繼續進行for循環。
p.casNext(null, newNode),是調用CAS對p進行操作。若“p的下一個節點等于null”,則設置“p的下一個節點等于newNode”;設置成功的話,返回true,失敗的話返回false。
情況2 -- p和q相等。這種情況什么時候會發生呢?通過“情況3”,我們知道,經過“情況3”的處理后,p的值可能等于q。
此時,若尾節點沒有發生變化的話,那么,應該是頭節點發生了變化,則設置p為頭節點,然后重新遍歷鏈表;否則(尾節點變化的話),則設置p為尾節點。
情況3 -- 其它。
我們將p = (p != t && t != (t = tail)) ? t : q;轉換成如下代碼。
if (p==t) { p = q; } else { Node<E> tmp=t; t = tail; if (tmp==t) { p=q; } else { p=t; } }
如果p和t相等,則設置p為q。否則的話,判斷“尾節點是否發生變化”,沒有變化的話,則設置p為q;否則,設置p為尾節點。
checkNotNull()的源碼如下:
private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException(); }
3. 刪除
下面以poll()為例對ConcurrentLinkedQueue中的刪除進行說明。
public E poll() { // 設置“標記” restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; // 情況1 // 表頭的數據不為null,并且“設置表頭的數據為null”這個操作成功的話; // 則比較“p和h”(若p!=h,即表頭發生了變化,則更新表頭,即設置表頭為p),然后返回原表頭的item值。 if (item != null && p.casItem(item, null)) { if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } // 情況2 // 表頭的下一個節點為null,即鏈表只有一個“內容為null的表頭節點”。則更新表頭為p,并返回null。 else if ((q = p.next) == null) { updateHead(h, p); return null; } // 情況3 // 這可能到由于“情況4”的發生導致p=q,在該情況下跳轉到restartFromHead標記重新操作。 else if (p == q) continue restartFromHead; // 情況4 // 設置p為q else p = q; } } }
說明:poll()的作用就是刪除鏈表的表頭節點,并返回被刪節點對應的值。poll()的實現原理和offer()比較類似,下面根將or循環劃分為4種情況進行分析。
情況1:“表頭節點的數據”不為null,并且“設置表頭節點的數據為null”這個操作成功。
p.casItem(item, null) -- 調用CAS函數,比較“節點p的數據值”與item是否相等,是的話,設置節點p的數據值為null。
在情況1發生時,先比較“p和h”,若p!=h,即表頭發生了變化,則調用updateHead()更新表頭;然后返回刪除節點的item值。
updateHead()的源碼如下:
final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
說明:updateHead()的最終目的是更新表頭為p,并設置h的下一個節點為h本身。
casHead(h,p)是通過CAS函數設置表頭,若表頭等于h的話,則設置表頭為p。
lazySetNext()的源碼如下:
void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); }
putOrderedObject()函數,我們在前面一章“TODO”中介紹過。h.lazySetNext(h)的作用是通過CAS函數設置h的下一個節點為h自身,該設置可能會延遲執行。
情況2:如果表頭的下一個節點為null,即鏈表只有一個“內容為null的表頭節點”。
則調用updateHead(h, p),將表頭更新p;然后返回null。
情況3:p=q
在“情況4”的發生后,會導致p=q;此時,“情況3”就會發生。當“情況3”發生后,它會跳轉到restartFromHead標記重新操作。
情況4:其它情況。
設置p=q。
ConcurrentLinkedQueue示例
import java.util.*; import java.util.concurrent.*; /* * ConcurrentLinkedQueue是“線程安全”的隊列,而LinkedList是非線程安全的。 * * 下面是“多個線程同時操作并且遍歷queue”的示例 * (01) 當queue是ConcurrentLinkedQueue對象時,程序能正常運行。 * (02) 當queue是LinkedList對象時,程序會產生ConcurrentModificationException異常。 * * */ public class ConcurrentLinkedQueueDemo1 { // TODO: queue是LinkedList對象時,程序會出錯。 //private static Queue<String> queue = new LinkedList<String>(); private static Queue<String> queue = new ConcurrentLinkedQueue<String>(); public static void main(String[] args) { // 同時啟動兩個線程對queue進行操作! new MyThread("ta").start(); new MyThread("tb").start(); } private static void printAll() { String value; Iterator iter = queue.iterator(); while(iter.hasNext()) { value = (String)iter.next(); System.out.print(value+", "); } System.out.println(); } private static class MyThread extends Thread { MyThread(String name) { super(name); } @Override public void run() { int i = 0; while (i++ < 6) { // “線程名” + "-" + "序號" String val = Thread.currentThread().getName()+i; queue.add(val); // 通過“Iterator”遍歷queue。 printAll(); } } } }
(某一次)運行結果:
ta1, ta1, tb1, tb1, ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta3, tb3, ta3, ta1, tb3, tb1, ta4, ta2, ta1, tb2, tb1, ta3, ta2, tb3, tb2, ta4, ta3, tb4, tb3, ta1, ta4, tb1, tb4, ta2, ta5, tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4, ta3, ta5, tb3, tb5, ta4, ta1, tb4, tb1, ta5, ta2, tb5, tb2, ta6, ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5, tb3, ta6, ta4, tb6, tb4, ta5, tb5, ta6, tb6,
結果說明:如果將源碼中的queue改成LinkedList對象時,程序會產生ConcurrentModificationException異常。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。