亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

JUC的LinkedBlockingQueue如何實現

發布時間:2021-12-21 10:23:36 來源:億速云 閱讀:129 作者:iii 欄目:大數據

本篇內容介紹了“JUC的LinkedBlockingQueue如何實現”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

由 Blocking 字樣可以推斷出 LinkedBlockingQueue 是阻塞隊列,前面我們介紹過阻塞隊列和非阻塞隊列在實現上的區別,知道阻塞隊列一般是基于鎖機制來保證線程安全,本文我們就一起來分析一下 LinkedBlockingQueue 是如何基于鎖構建線程安全隊列的。

同樣由 Linked 關鍵字我們可以推斷出 LinkedBlockingQueue 底層依賴于鏈表實現,在 LinkedBlockingQueue 的內部實現了一個單鏈表,用以存放隊列元素。其中,結點 Node 類定義如下:

static class Node<E> {

    E item;
    Node<E> next;

    Node(E x) {
        item = x;
    }
}

其中 next 指針的指向分為 3 種情況:

  1. 指向某個具體的后繼結點。

  2. 指向自己,意味著后繼結點為 head.next

  3. 指向 null,說明當前結點是隊列的尾結點,沒有后繼結點。

LinkedBlockingQueue 定義了 head 和 last 指針分別指向隊列的頭結點和尾結點。此外,LinkedBlockingQueue 還定義了如下字段:

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {

    /** 當前隊列的容量上限 */
    private final int capacity;
    /** 記錄當前隊列的元素個數 */
    private final AtomicInteger count = new AtomicInteger();

    /** 隊列頭結點 */
    transient Node<E> head;
    /** 隊列尾結點 */
    private transient Node<E> last;

    /** 用于控制 take、poll 等操作,保證同一時間只有一個線程從隊列獲取元素 */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** 條件隊列,記錄出隊列時因為隊列為空而等待的線程 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 用戶控制 put、offer 等操作,保證同一時間只有一個線程往隊列添加元素 */
    private final ReentrantLock putLock = new ReentrantLock();
    /** 條件隊列,記錄入隊列時因為隊列已滿而等待的線程 */
    private final Condition notFull = putLock.newCondition();

    // ... 省略方法定義

}

由上述字段定義可以看出,LinkedBlockingQueue 限制了隊列的容量上限,并使用 AtomicInteger 類型字段對隊列中的元素個數進行計數。雖然 LinkedBlockingQueue 底層依賴于鏈表實現,理論上是無界的,但是 LinkedBlockingQueue 在實現上卻限制了隊列的容量上限(默認為 Integer.MAX_VALUE)。

此外,針對出隊列和入隊列操作,LinkedBlockingQueue 分別設置了一把獨占可重入鎖,即 takeLock 和 putLock,從而保證同一時間只有一個線程執行出隊列操作,只有一個線程執行入隊列操作,且出隊列的線程與入隊列的線程彼此之間不相互影響。針對一些阻塞版本的出隊列入隊列方法,如果隊列為空,則出隊列線程會被記錄到條件隊列 notEmpty 中進行等待,如果隊列已滿,則入隊列線程會被記錄到條件隊列 notFull 中進行等待。

BlockingQueue 接口

BlockingQueue 接口繼承自 Queue 接口,用于描述阻塞隊列。當隊列無法及時響應用戶請求時,例如當我們嘗試從空隊列中獲取元素,或者繼續往已滿的有界隊列中添加元素,BlockingQueue 定義了以下 4 種響應形式:

  1. 拋出異常。

  2. 立即返回特殊值,例如 null 或 false。

  3. 無限期阻塞當前請求,直到隊列狀態變為可用。

  4. 超時阻塞當前請求,直到隊列狀態變為可用。

BlockingQueue 接口的定義如下:

public interface BlockingQueue<E> extends Queue<E> {

    boolean offer(E e);
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
    boolean add(E e);
    void put(E e) throws InterruptedException;

    E poll(long timeout, TimeUnit unit) throws InterruptedException;
    E take() throws InterruptedException;

    boolean remove(Object o);

    boolean contains(Object o);
    int remainingCapacity();

    int drainTo(Collection<? super E> c);
    int drainTo(Collection<? super E> c, int maxElements);

}

針對各方法的含義說明如下:

  • offer:往隊列中添加元素,如果成功則返回 true,對于有界隊列來說,如果隊列已滿則返回 false,而不是拋出異常。BlockingQueue 同時還聲明了超時版本的 offer 方法。

  • add:往隊列中添加元素,如果成功則返回 true,對于有界隊列來說,如果隊列已滿則拋出 IllegalStateException 異常。

  • put:往隊列中添加元素,對于有界隊列來說,如果隊列已滿則阻塞當前請求,期間支持響應中斷。

  • poll:移除隊列頭結點,并返回結點元素值,如果隊列為空則等待指定時間,并在超時時返回 null,期間支持響應中斷。

  • take:僅獲取頭結點元素值而不刪除結點,如果隊列為空則阻塞等待,期間支持響應中斷。

  • remove:接收一個參數,從隊列中刪除值等于該參數的結點,如果存在多個結點滿足要求,則刪除第一個。

  • contains:接收一個參數,判斷隊列中是否存在值等于該參數的結點。

  • remainingCapacity:返回隊列的剩余容量,如果是無界隊列,則返回 Integer.MAX_VALUE

  • drainTo:從隊列中移除所有(或指定個數)結點,并將結點元素放入參數指定的集合中返回,相對于逐個移除更加高效。

核心方法實現

LinkedBlockingQueue 實現自 BlockingQueue 接口,下面針對核心方法的實現逐一進行分析。

添加元素:offer & add & put

針對添加元素的操作,LinkedBlockingQueue 實現了 LinkedBlockingQueue#offerLinkedBlockingQueue#addLinkedBlockingQueue#put 方法,其中 LinkedBlockingQueue#add 是對 LinkedBlockingQueue#offer 的封裝,并在隊列已滿時拋出 IllegalStateException 異常。

下面主要展開分析 LinkedBlockingQueue#offerLinkedBlockingQueue#put 方法的實現。首先來看一下 LinkedBlockingQueue#offer 方法,實現如下:

public boolean offer(E e) {
    // 不允許添加 null 元素
    if (e == null) {
        throw new NullPointerException();
    }
    final AtomicInteger count = this.count;
    // 當前隊列已滿,直接返回 false
    if (count.get() == capacity) {
        return false;
    }
    int c = -1;
    // 創建待添加元素對應的結點對象
    Node<E> node = new Node<>(e);
    final ReentrantLock putLock = this.putLock;
    // 加鎖
    putLock.lock();
    try {
        // 再次校驗隊列是否已滿
        if (count.get() < capacity) {
            // 往隊列末端追加結點
            this.enqueue(node);
            // 隊列元素個數計數加 1,并返回添加之前隊列的大小
            c = count.getAndIncrement();
            // 當前隊列在執行添加操作之后仍然存在空閑位置,嘗試喚醒一個之前因為隊列已滿而等待的線程
            if (c + 1 < capacity) {
                notFull.signal();
            }
        }
    } finally {
        // 釋放鎖
        putLock.unlock();
    }
    // c == 0 說明隊列中至少存在一個元素(當前添加的),嘗試喚醒一個之前因為隊列為空而等待的線程
    if (c == 0) {
        this.signalNotEmpty();
    }
    return c >= 0;
}

與 ConcurrentLinkedQueue 一樣,LinkedBlockingQueue 同樣不允許往其中添加 null 元素。如果隊列已滿,則上述方法會直接返回 false,表示添加失敗,否則創建待添加元素對應的結點對象,并繼續執行:

  1. 加鎖,保證同一時間只有一個線程在執行添加操作;

  2. 再次校驗隊列是否已滿,如果已滿則跳轉至步驟 5,否則執行 LinkedBlockingQueue#enqueue 方法往隊列末端插入結點;

  3. 結點個數計數加 1;

  4. 如果在完成本次添加操作之后,隊列仍然未滿,則嘗試喚醒一個之前因為隊列已滿而等待的線程;

  5. 釋放鎖;

  6. 如果本次成功添加了一個元素,則調用 LinkedBlockingQueue#signalNotEmpty 方法嘗試喚醒一個之前因為隊列為空而等待的線程;

  7. 返回。

其中 LinkedBlockingQueue#signalNotEmpty 方法的實現比較簡單,讀者可以參考源碼實現。這里簡單提一下 LinkedBlockingQueue#enqueue 方法,實現如下:

private void enqueue(Node<E> node) {
    last = last.next = node;
}

在 LinkedBlockingQueue 對象被構造出來時,head 和 last 指針均指向一個元素值為 null 的標記結點。由上述方法的實現可以看出當執行入隊列操作時,是將結點賦值給 last 結點的 next 指針,并沒有移除隊列頭部的 null 結點,下文在介紹出隊列操作時返回的都是 head.next 結點元素值,理解了上述插入操作的執行過程也就不會疑惑為什么出隊列時不是直接返回 head 結點的元素值。

LinkedBlockingQueue 還定義了超時版本的 LinkedBlockingQueue#offer(E, long, TimeUnit) 方法,當隊列已滿時,該方法會阻塞等待指定的時間。

下面再來看一下 LinkedBlockingQueue#put 方法,相對于上面介紹的 LinkedBlockingQueue#offer 方法,對于有界隊列而言,如果隊列已滿則該方法將無限期阻塞,方法實現如下:

public void put(E e) throws InterruptedException {
    // 不允許添加 null 元素
    if (e == null) {
        throw new NullPointerException();
    }
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 加鎖,期間支持響應中斷
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is not protected by lock.
         * This works because count can only decrease at this point (all other puts are shut out by lock),
         * and we (or some other waiting put) are signalled if it ever changes from capacity.
         * Similarly for all other uses of count in other wait guards.
         */

        // 隊列已滿,則等待
        while (count.get() == capacity) {
            notFull.await();
        }
        // 執行入隊列操作
        this.enqueue(node);
        // 隊列元素個數計數加 1,并返回添加之前隊列的大小
        c = count.getAndIncrement();
        // 當前隊列在執行添加操作之后仍然存在空閑位置,嘗試喚醒一個之前因為隊列已滿而等待的線程
        if (c + 1 < capacity) {
            notFull.signal();
        }
    } finally {
        // 釋放鎖
        putLock.unlock();
    }
    // c == 0 說明隊列中至少存在一個元素(當前添加的),嘗試喚醒一個之前因為隊列為空而等待的線程
    if (c == 0) {
        this.signalNotEmpty();
    }
}

由上述實現可以看出,相對于 LinkedBlockingQueue#offer 方法在隊列已滿時的直接返回,方法 LinkedBlockingQueue#put 會將當前線程添加到條件隊列中等待其它線程釋放隊列空間。

獲取元素:poll & peek & take

針對獲取元素的操作,LinkedBlockingQueue 實現了 LinkedBlockingQueue#pollLinkedBlockingQueue#peekLinkedBlockingQueue#take 方法,其中 LinkedBlockingQueue#peek 方法僅獲取隊列頭結點元素值,而不移除頭結點,實現上比較簡單。下面展開分析 LinkedBlockingQueue#pollLinkedBlockingQueue#take 方法的實現機制。

首先來看一下 LinkedBlockingQueue#poll 方法,LinkedBlockingQueue 針對該方法定義了兩個版本,區別在于當隊列為空時是立即返回還是阻塞等待一段時間,而在實現思路上是一致的。這里以不帶超時參數的版本為例展開分析,實現如下:

public E poll() {
    final AtomicInteger count = this.count;
    // 當前隊列為空,直接返回 null
    if (count.get() == 0) {
        return null;
    }
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    // 加鎖
    takeLock.lock();
    try {
        // 如果當前隊列不為空
        if (count.get() > 0) {
            // 獲取隊列頭結點元素,并移除頭結點
            x = this.dequeue();
            // 隊列元素計數值減 1,這里返回的是減 1 之前的值
            c = count.getAndDecrement();
            // 隊列在執行移除操作后至少還存在一個元素,嘗試喚醒一個之前因為隊列為空而阻塞的線程
            if (c > 1) {
                notEmpty.signal();
            }
        }
    } finally {
        // 釋放鎖
        takeLock.unlock();
    }
    /*
     * 之前隊列已滿,但是經過本次 poll 操作之后,至少有一個空閑位置,
     * 嘗試喚醒一個之前因為隊列已滿而阻塞的線程
     */
    if (c == capacity) {
        this.signalNotFull();
    }
    return x;
}

如果隊列為空則上述方法會直接返回 null,否則繼續執行:

  1. 加鎖,保證同一時間只有一個線程在執行獲取操作;

  2. 再次校驗隊列是否為空,如果為空則跳轉至步驟 5,否則執行 LinkedBlockingQueue#dequeue 方法移除隊列頭結點,并返回結點元素值;

  3. 結點個數計數減 1;

  4. 如果在完成本次移除操作之后,隊列仍然非空,則嘗試喚醒一個之前因為隊列為空而等待的線程;

  5. 釋放鎖;

  6. 如果本次成功移除了一個元素,則調用 LinkedBlockingQueue#signalNotFull 方法嘗試喚醒一個之前因為隊列已滿而等待的線程;

  7. 返回。

其中 LinkedBlockingQueue#signalNotFull 方法的實現比較簡單,讀者可以參考源碼實現。前面我們分析了入隊列 LinkedBlockingQueue#enqueue 方法,下面來看一下出隊列方法,實現如下:

private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    // 自引用,等待 GC 回收
    h.next = h; // help GC
    head = first;
    // 獲取真正隊列頭結點的元素值
    E x = first.item;
    // 將隊列頭結點元素值置為 null
    first.item = null;
    return x;
}

理解了前面入隊列的過程,則上述出隊列的實現也就一目了然,只要清楚隊列的頭結點一直是一個值為 null 的結點,而真正有效的隊列頭結點是該結點的 next 結點。

LinkedBlockingQueue 還定義了超時版本的 LinkedBlockingQueue#poll(long, TimeUnit) 方法,當隊列為空時,該方法會阻塞等待指定的時間。

下面再來看一下 LinkedBlockingQueue#take 方法,相對于上面介紹的 LinkedBlockingQueue#poll 方法,對于有界隊列而言,如果隊列為空則該方法將無限期阻塞,方法實現如下:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 加鎖
    takeLock.lockInterruptibly();
    try {
        // 如果隊列為空,則等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 獲取隊列頭結點元素,并移除頭結點
        x = this.dequeue();
        // 隊列元素計數值減 1,這里返回的是減 1 之前的值
        c = count.getAndDecrement();
        // 隊列在執行移除操作后至少還存在一個元素,嘗試喚醒一個之前因為隊列為空而阻塞的線程
        if (c > 1) {
            notEmpty.signal();
        }
    } finally {
        // 釋放鎖
        takeLock.unlock();
    }
    /*
     * 之前隊列已滿,但是經過本次 poll 操作之后,至少有一個空閑位置,
     * 嘗試喚醒一個之前因為隊列已滿而阻塞的線程
     */
    if (c == capacity) {
        this.signalNotFull();
    }
    return x;
}

由上述實現可以看出,相對于 LinkedBlockingQueue#poll 方法在隊列為空時的直接返回,方法 LinkedBlockingQueue#take 會將當前線程添加到條件隊列中等待其它線程添加新的隊列元素。

移除元素:remove

針對移除元素的操作,LinkedBlockingQueue 實現了 LinkedBlockingQueue#remove 方法,并提供了有參和無參的版本,其中無參版本實際上是委托給 LinkedBlockingQueue#poll 方法執行的。下面來分析一下有參版本的實現,如下:

public boolean remove(Object o) {
    if (o == null) {
        return false;
    }
    // 鎖定出隊列、入隊列操作
    this.fullyLock();
    try {
        // 從頭開始遍歷隊列
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            // 如果找到第一個目標元素,則移除
            if (o.equals(p.item)) {
                // 移除 p 結點,如果執行移除之后隊列有空閑位置,
                // 則嘗試喚醒一個之前因為隊列已滿而阻塞的線程
                this.unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        // 釋放出隊列、入隊列操作
        this.fullyUnlock();
    }
}

上述方法接收一個參數,并執行刪除元素值等于該參數的結點,如果存在多個滿足條件的結點,則刪除第一個。在執行刪除操作之前會獲取 putLock 和 takeLock 兩把鎖,以防止刪除期間有其它線程執行出隊列或入隊列操作。

其它操作:size & contains

最后來看一下 LinkedBlockingQueue#containsLinkedBlockingQueue#size 方法的實現,前者用于檢查隊列是否包含值等于參數的結點,實現如下:

public boolean contains(Object o) {
    if (o == null) {
        return false;
    }
    // 鎖定出隊列、入隊列操作
    this.fullyLock();
    try {
        // 從頭開始遍歷鏈表,并逐一比對
        for (Node<E> p = head.next; p != null; p = p.next) {
            if (o.equals(p.item)) {
                return true;
            }
        }
        return false;
    } finally {
        // 釋放出隊列、入隊列操作
        this.fullyUnlock();
    }
}

方法 LinkedBlockingQueue#size 用于返回隊列的結點個數,前面已經介紹了 LinkedBlockingQueue 定義了一個 AtomicInteger 類型的字段用于計數隊列的結點個數,所以 LinkedBlockingQueue#size 方法能夠精確的返回,且幾乎沒有性能開銷,同時在實現上非常簡單,如下:

public int size() {
    return count.get();
}

“JUC的LinkedBlockingQueue如何實現”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

仁怀市| 白朗县| 乌鲁木齐市| 阜宁县| 正安县| 宜章县| 英德市| 阳西县| 乐平市| 忻州市| 色达县| 广丰县| 朝阳县| 夏河县| 兰西县| 当雄县| 宁城县| 衡阳市| 嵩明县| 连南| 诏安县| 山阳县| 武清区| 琼海市| 化德县| 依兰县| 黎川县| 永安市| 长沙县| 祁阳县| 江西省| 牡丹江市| 丰城市| 广德县| 万全县| 吴桥县| 五家渠市| 米泉市| 固原市| 定边县| 博罗县|