您好,登錄后才能下訂單哦!
怎么深入理解JUC中的Semaphore,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
前面我們分析了同步器 CountDownLatch 和 CyclicBarrier 的設計和實現,這兩個同步器在使用上都有一個共同的特點,就是在構造時需要指定參與的線程數目,然后對計數器執行減值操作。本文將要介紹的 Semaphore 信號量同樣在構造時需要指定一個 int 類型 permits 參數,不過該參數并不用于指定參與的線程數目,相反,Semaphore 并不限制參與的線程數,該參數用于限制同一時間最大允許執行的線程數目上限。
參與到 Semaphore 中的線程如果希望繼續運行,需要從 Semaphore 那里申請獲取一個或多個令牌,只有成功拿到令牌的線程才允許繼續執行,否則需要阻塞等待,并在執行完成之后需要歸還令牌。參數 permits 可以理解為令牌的總數,只要 Semaphore 手上有可用的令牌,就允許有新的線程過來申請。一個線程一次性可以申請一個或多個令牌,只要令牌的數量足夠多,Semaphore 就允許同一個時間有多個線程并行執行。
下面以一個排隊就餐的例子來演示 Semaphore 的基本使用,假設一個餐廳一次性最多只能容納 5 個人同時就餐,但是因為菜品口味極佳,所以生意非常好,來就餐的人多于餐廳能夠同時容納的人數上限,所以超出的人需要在外面排隊等待叫號。假設今天有 20 個人前來就餐,那么叫號的過程可以實現如下:
private static final int MAX_COUNT = 5; private static class Person implements Runnable { private Semaphore semaphore; public Person(Semaphore semaphore) { this.semaphore = semaphore; } @Override public void run() { try { System.out.println("Thread " + Thread.currentThread().getName() + " is waiting."); semaphore.acquire(); System.out.println("Thread " + Thread.currentThread().getName() + " is eating."); TimeUnit.SECONDS.sleep(RandomUtils.nextInt(1, 3)); System.out.println("Thread " + Thread.currentThread().getName() + " ate up."); } catch (Exception e) { e.printStackTrace(); } finally { semaphore.release(); } } } public static void main(String[] args) { // 使用公平鎖,保證叫號盡量的公平 Semaphore semaphore = new Semaphore(MAX_COUNT, true); for (int i = 0; i < 20; i++) { new Thread(new Person(semaphore), String.valueOf(i)).start(); } }
上述示例中當一個顧客到達時需要調用 Semaphore#acquire
方法申請獲取令牌(即詢問是否有空位),如果沒有空閑的令牌則需要等待。當一個顧客就餐完畢之后需要歸還之前申請到的令牌(執行 Semaphore#release
方法),此時允許下一位顧客申請令牌進入餐廳就餐。
下面來看一下 Semaphore 的設計與實現。Semaphore 同樣基于 AQS 實現,其內部類 Sync 繼承自 AbstractQueuedSynchronizer,并派生出 FairSync 和 NonfairSync 兩個子類,分別表示公平鎖和非公平鎖,這些設計與前面文章中介紹的基于 AQS 實現的 ReentrantLock 和 ReentrantReadWriteLock 如出一轍。Semaphore 在構造時允許我們通過參數指定是使用公平鎖還是非公平鎖,默認為非公平鎖,如下:
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
Semaphore 中定義的方法在實現上均委托給 Sync 對象執行,并復用 AQS 的 state 字段記錄當前剩余可用的令牌數。下面重點來分析一下 Semaphore 申請和歸還令牌的方法實現,即 Semaphore#acquire
和 Semaphore#release
方法。首先來看一下令牌申請的過程,Semaphore 提供了多個版本的 Semaphore#acquire
方法實現,包括:
Semaphore#acquire()
:申請 1 個令牌,如果成功則立即返回,否則阻塞等待,期間支持響應中斷請求。
Semaphore#acquire(int)
:相對于 Semaphore#acquire()
的區別在于一次性申請多個令牌。
Semaphore#acquireUninterruptibly()
:申請 1 個令牌,如果成功則立即返回,否則阻塞等待,期間忽略中斷請求。
Semaphore#acquireUninterruptibly(int)
:相對于 Semaphore#acquireUninterruptibly()
的區別在于一次性申請多個令牌。
Semaphore#tryAcquire()
:嘗試申請 1 個令牌,不管成功還是失敗都會立即返回,成功則返回 true,失敗則返回 false。
Semaphore#tryAcquire(int)
:相對于 Semaphore#tryAcquire()
的區別在于一次性申請多個令牌。
Semaphore#tryAcquire(long, TimeUnit)
:嘗試申請 1 個令牌,相對于 Semaphore#tryAcquire()
引入了超時等待機制。
Semaphore#tryAcquire(int, long, TimeUnit)
:相對于 Semaphore#tryAcquire(long, TimeUnit)
的區別在于一次性申請多個令牌。
這些申請令牌的方法在實現上大同小異,下面以 Semaphore#acquire()
為例分析一下具體的執行過程。方法實現如下:
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
上述方法直接委托給 AQS 的 AbstractQueuedSynchronizer#acquireSharedInterruptibly
方法執行,申請令牌單位為 1。前面在分析 AQS 時已經介紹了該方法的運行機制,下面重點來看一下 Sync 對于模板方法 AbstractQueuedSynchronizer#tryAcquireShared
的實現(以 NonfairSync 為例):
// NonfairSync#tryAcquireShared protected int tryAcquireShared(int acquires) { return this.nonfairTryAcquireShared(acquires); } // Sync#nonfairTryAcquireShared final int nonfairTryAcquireShared(int acquires) { for (; ; ) { // 獲取 state 狀態值 int available = this.getState(); // 計算剩余可用的資源數 int remaining = available - acquires; if (remaining < 0 // 當前沒有可用的資源 || this.compareAndSetState(available, remaining)) { // 當前有可用的資源,且獲取資源成功 return remaining; } } }
申請令牌的執行流程可以總結為:
獲取當前剩余可用的令牌數,即 state 值;
如果剩余可用的令牌數小于本次申請的數目,則返回差值(負值);
否則,更新 state 值,如果更新成功則說明獲取令牌成功,返回差值(非負值)。
由 AbstractQueuedSynchronizer#acquireSharedInterruptibly
方法的實現我們知道,如果上述過程返回負值,則會將當前線程添加到同步隊列中阻塞等待。
再來看一下令牌歸還的過程,Semaphore 同樣提供了多個版本的 Semaphore#release
方法實現,包括:
Semaphore#release()
:歸還 1 個令牌。
Semaphore#release(int)
:歸還指定數目的令牌。
下面以 Semaphore#release()
方法為例分析一下令牌歸還的執行過程,實現如下:
public void release() { sync.releaseShared(1); }
上述方法直接委托給 AQS 的 AbstractQueuedSynchronizer#releaseShared
方法執行,歸還令牌單位為 1。前面在分析 AQS 時同樣已經介紹了該方法的運行機制,下面重點來看一下 Sync 對于模板方法 AbstractQueuedSynchronizer#tryReleaseShared
的實現:
protected final boolean tryReleaseShared(int releases) { for (; ; ) { // 獲取 state 狀態值 int current = this.getState(); // 計算釋放之后剩余的資源數 int next = current + releases; if (next < current) { // overflow // 溢出 throw new Error("Maximum permit count exceeded"); } // 更新 state 狀態值 if (this.compareAndSetState(current, next)) { return true; } } }
令牌歸還的執行過程如上述代碼注釋,比較簡單,但是有一點疑問的是什么情況下 next 值會溢出?
一般來說線程在歸還令牌之前必須先申請令牌,這樣就能夠保證空閑令牌的數量始終不會大于我們在構造 Semaphore 時指定的初始值,然而在上述方法實現中,我們并沒有看到有任何邏輯限定在調用 Semaphore#release
方法之前必須調用 Semaphore#acquire
方法。實際上 Semaphore 在實現時也的確沒有添加這一限制,也就說任何線程都可以調用 Semaphore#release
方法歸還令牌,即使它之前從來沒有申請過令牌,這樣就會導致令牌的數量溢出。官方文檔中有如下說明:
There is no requirement that a thread that releases a permit must have acquired that permit by calling {@link #acquire}. Correct usage of a semaphore is established by programming convention in the application.
也就是說,Semaphore 并不要求線程在歸還令牌之前一定要先申請獲取令牌,具體由應用程序自己決定。
我們分析了 Semaphore 信號量的設計與實現,了解到 Semaphore 同樣是基于 AQS 實現的同步器組件。Semaphore 通過令牌機制以限定參與的線程在同一時間執行的線程數目不能超過令牌的個數,在語義和實現上都比較簡單,但功能卻很強大。最后還需要注意的一點就是,Semaphore 并不要求在歸還令牌之前一定要先申請獲取令牌,開發者可以結合自身業務邏輯來靈活應用這一點。
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。