您好,登錄后才能下訂單哦!
這篇文章主要為大家展示了“Java中如何實現AQS共享模式與并發工具類”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“Java中如何實現AQS共享模式與并發工具類”這篇文章吧。
下面這個例子非常實用,我是 javadoc 的搬運工:
// 這是一個關于緩存操作的故事 class CachedData { Object data; volatile boolean cacheValid; // 讀寫鎖實例 final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { // 獲取讀鎖 rwl.readLock().lock(); if (!cacheValid) { // 如果緩存過期了,或者為 null // 釋放掉讀鎖,然后獲取寫鎖 (后面會看到,沒釋放掉讀鎖就獲取寫鎖,會發生死鎖情況) rwl.readLock().unlock(); rwl.writeLock().lock(); try { if (!cacheValid) { // 重新判斷,因為在等待寫鎖的過程中,可能前面有其他寫線程執行過了 data = ... cacheValid = true; } // 獲取讀鎖 (持有寫鎖的情況下,是允許獲取讀鎖的,稱為 “鎖降級”,反之不行。) rwl.readLock().lock(); } finally { // 釋放寫鎖,此時還剩一個讀鎖 rwl.writeLock().unlock(); // Unlock write, still hold read } } try { use(data); } finally { // 釋放讀鎖 rwl.readLock().unlock(); } } }
ReentrantReadWriteLock 分為讀鎖和寫鎖兩個實例,讀鎖是共享鎖,可被多個線程同時使用,寫鎖是獨占鎖。持有寫鎖的線程可以繼續獲取讀鎖,反之不行。
這一節比較重要,我們要先看清楚 ReentrantReadWriteLock 的大框架,然后再到源碼細節。
首先,我們來看下 ReentrantReadWriteLock 的結構,它有好些嵌套類:
大家先仔細看看這張圖中的信息。然后我們把 ReadLock 和 WriteLock 的代碼提出來一起看,清晰一些:
很清楚了,ReadLock 和 WriteLock 中的方法都是通過 Sync 這個類來實現的。Sync 是 AQS 的子類,然后再派生了公平模式和不公平模式。
從它們調用的 Sync 方法,我們可以看到: ReadLock 使用了共享模式,WriteLock 使用了獨占模式。
等等,同一個 AQS 實例怎么可以同時使用共享模式和獨占模式???
這里給大家回顧下 AQS,我們橫向對比下 AQS 的共享模式和獨占模式:
AQS 的精髓在于內部的屬性 state:
對于獨占模式來說,通常就是 0 代表可獲取鎖,1 代表鎖被別人獲取了,重入例外
而共享模式下,每個線程都可以對 state 進行加減操作
也就是說,獨占模式和共享模式對于 state 的操作完全不一樣,那讀寫鎖 ReentrantReadWriteLock 中是怎么使用 state 的呢?答案是將 state 這個 32 位的 int 值分為高 16 位和低 16位,分別用于共享模式和獨占模式。
有了前面的概念,大家心里應該都有數了吧,下面就不再那么啰嗦了,直接代碼分析。
源代碼加注釋 1500 行,并不算難,我們要看的代碼量不大。如果你前面一節都理解了,那么直接從頭開始一行一行往下看就是了,還是比較簡單的。
ReentrantReadWriteLock 的前面幾行很簡單,我們往下滑到 Sync 類,先來看下它的所有的屬性:
abstract static class Sync extends AbstractQueuedSynchronizer { // 下面這塊說的就是將 state 一分為二,高 16 位用于共享模式,低16位用于獨占模式 static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 取 c 的高 16 位值,代表讀鎖的獲取次數(包括重入) static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 取 c 的低 16 位值,代表寫鎖的重入次數,因為寫鎖是獨占模式 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // 這個嵌套類的實例用來記錄每個線程持有的讀鎖數量(讀鎖重入) static final class HoldCounter { // 持有的讀鎖數 int count = 0; // 線程 id final long tid = getThreadId(Thread.currentThread()); } // ThreadLocal 的子類 static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } /** * 組合使用上面兩個類,用一個 ThreadLocal 來記錄當前線程持有的讀鎖數量 */ private transient ThreadLocalHoldCounter readHolds; // 用于緩存,記錄"最后一個獲取讀鎖的線程"的讀鎖重入次數, // 所以不管哪個線程獲取到讀鎖后,就把這個值占為已用,這樣就不用到 ThreadLocal 中查詢 map 了 // 算不上理論的依據:通常讀鎖的獲取很快就會伴隨著釋放, // 顯然,在 獲取->釋放 讀鎖這段時間,如果沒有其他線程獲取讀鎖的話,此緩存就能幫助提高性能 private transient HoldCounter cachedHoldCounter; // 第一個獲取讀鎖的線程(并且其未釋放讀鎖),以及它持有的讀鎖數量 private transient Thread firstReader = null; private transient int firstReaderHoldCount; Sync() { // 初始化 readHolds 這個 ThreadLocal 屬性 readHolds = new ThreadLocalHoldCounter(); // 為了保證 readHolds 的內存可見性 setState(getState()); // ensures visibility of readHolds } ... }
state 的高 16 位代表讀鎖的獲取次數,包括重入次數,獲取到讀鎖一次加 1,釋放掉讀鎖一次減 1
state 的低 16 位代表寫鎖的獲取次數,因為寫鎖是獨占鎖,同時只能被一個線程獲得,所以它代表重入次數
每個線程都需要維護自己的 HoldCounter,記錄該線程獲取的讀鎖次數,這樣才能知道到底是不是讀鎖重入,用 ThreadLocal 屬性 readHolds 維護
cachedHoldCounter 有什么用?其實沒什么用,但能提示性能。將最后一次獲取讀鎖的線程的 HoldCounter 緩存到這里,這樣比使用 ThreadLocal 性能要好一些,因為 ThreadLocal 內部是基于 map 來查詢的。但是 cachedHoldCounter 這一個屬性畢竟只能緩存一個線程,所以它要起提升性能作用的依據就是:通常讀鎖的獲取緊隨著就是該讀鎖的釋放。我這里可能表達不太好,但是大家應該是懂的吧。
firstReader 和 firstReaderHoldCount 有什么用?其實也沒什么用,但是它也能提示性能。將”第一個”獲取讀鎖的線程記錄在 firstReader 屬性中,這里的第一個不是全局的概念,等這個 firstReader 當前代表的線程釋放掉讀鎖以后,會有后來的線程占用這個屬性的。firstReader 和 firstReaderHoldCount 使得在讀鎖不產生競爭的情況下,記錄讀鎖重入次數非常方便快速
如果一個線程使用了 firstReader,那么它就不需要占用 cachedHoldCounter
個人認為,讀寫鎖源碼中最讓初學者頭疼的就是這幾個用于提升性能的屬性了,使得大家看得云里霧里的。主要是因為 ThreadLocal 內部是通過一個 ThreadLocalMap 來操作的,會增加檢索時間。而很多場景下,執行 unlock 的線程往往就是剛剛最后一次執行 lock 的線程,中間可能沒有其他線程進行 lock。還有就是很多不怎么會發生讀鎖競爭的場景。
上面說了這么多,是希望能幫大家降低后面閱讀源碼的壓力,大家也可以先看看后面的,然后再慢慢體會。
前面我們好像都只說讀鎖,完全沒提到寫鎖,主要是因為寫鎖真的是簡單很多,我也特地將寫鎖的源碼放到了后面,我們先啃下最難的讀鎖先。
下面我就不一行一行按源碼順序說了,我們按照使用來說。
我們來看下讀鎖 ReadLock 的 lock 流程:
// ReadLock public void lock() { sync.acquireShared(1); } // AQS public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
然后我們就會進到 Sync 類的 tryAcquireShared 方法:
在 AQS 中,如果 tryAcquireShared(arg) 方法返回值小于 0 代表沒有獲取到共享鎖(讀鎖),大于 0 代表獲取到
回顧 AQS 共享模式:tryAcquireShared 方法不僅僅在 acquireShared 的最開始被使用,這里是 try,也就可能會失敗,如果失敗的話,執行后面的 doAcquireShared,進入到阻塞隊列,然后等待前驅節點喚醒。喚醒以后,還是會調用 tryAcquireShared 進行獲取共享鎖的。當然,喚醒以后再 try 是很容易獲得鎖的,因為這個節點已經排了很久的隊了,組織是會照顧它的。
所以,你在看下面這段代碼的時候,要想象到兩種獲取讀鎖的場景,一種是新來的,一種是排隊排到它的。
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // exclusiveCount(c) 不等于 0,說明有線程持有寫鎖, // 而且不是當前線程持有寫鎖,那么當前線程獲取讀鎖失敗 // (另,如果持有寫鎖的是當前線程,是可以繼續獲取讀鎖的) if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 讀鎖的獲取次數 int r = sharedCount(c); // 讀鎖獲取是否需要被阻塞,稍后細說。為了進去下面的分支,假設這里不阻塞就好了 if (!readerShouldBlock() && // 判斷是否會溢出 (2^16-1,沒那么容易溢出的) r < MAX_COUNT && // 下面這行 CAS 是將 state 屬性的高 16 位加 1,低 16 位不變,如果成功就代表獲取到了讀鎖 compareAndSetState(c, c + SHARED_UNIT)) { // ======================= // 進到這里就是獲取到了讀鎖 // ======================= if (r == 0) { // r == 0 說明此線程是第一個獲取讀鎖的,或者說在它前面獲取讀鎖的都走光光了,它也算是第一個吧 // 記錄 firstReader 為當前線程,及其持有的讀鎖數量:1 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 進來這里,說明是 firstReader 重入獲取讀鎖(這非常簡單,count 加 1 結束) firstReaderHoldCount++; } else { // 前面我們說了 cachedHoldCounter 用于緩存最后一個獲取讀鎖的線程 // 如果 cachedHoldCounter 緩存的不是當前線程,設置為緩存當前線程的 HoldCounter HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) // 到這里,那么就是 cachedHoldCounter 緩存的是當前線程,但是 count 為 0, // 大家可以思考一下:這里為什么要 set ThreadLocal 呢?(當然,答案肯定不在這塊代碼中) // 既然 cachedHoldCounter 緩存的是當前線程, // 當前線程肯定調用過 readHolds.get() 進行初始化 ThreadLocal readHolds.set(rh); // count 加 1 rh.count++; } // return 大于 0 的數,代表獲取到了共享鎖 return 1; } // 往下看 return fullTryAcquireShared(current); }
上面的代碼中,要進入 if 分支,需要滿足:readerShouldBlock() 返回 false,并且 CAS 要成功(我們先不要糾結 MAX_COUNT 溢出)。
那我們反向推,怎么樣進入到最后的 fullTryAcquireShared:
readerShouldBlock() 返回 true,2 種情況:
在 FairSync 中說的是 hasQueuedPredecessors(),即阻塞隊列中有其他元素在等待鎖。
也就是說,公平模式下,有人在排隊呢,你新來的不能直接獲取鎖
在 NonFairSync 中說的是 apparentlyFirstQueuedIsExclusive(),即判斷阻塞隊列中 head 的第一個后繼節點是否是來獲取寫鎖的,如果是的話,讓這個寫鎖先來,避免寫鎖饑餓。
作者給寫鎖定義了更高的優先級,所以如果碰上獲取寫鎖的線程馬上就要獲取到鎖了,獲取讀鎖的線程不應該和它搶。
如果 head.next 不是來獲取寫鎖的,那么可以隨便搶,因為是非公平模式,大家比比 CAS 速度
compareAndSetState(c, c + SHARED_UNIT) 這里 CAS 失敗,存在競爭。可能是和另一個讀鎖獲取競爭,當然也可能是和另一個寫鎖獲取操作競爭。
然后就會來到 fullTryAcquireShared 中再次嘗試:
/** * 1\. 剛剛我們說了可能是因為 CAS 失敗,如果就此返回,那么就要進入到阻塞隊列了, * 想想有點不甘心,因為都已經滿足了 !readerShouldBlock(),也就是說本來可以不用到阻塞隊列的, * 所以進到這個方法其實是增加 CAS 成功的機會 * 2\. 在 NonFairSync 情況下,雖然 head.next 是獲取寫鎖的,我知道它等待很久了,我沒想和它搶, * 可是如果我是來重入讀鎖的,那么只能表示對不起了 */ final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; // 別忘了這外層有個 for 循環 for (;;) { int c = getState(); // 如果其他線程持有了寫鎖,自然這次是獲取不到讀鎖了,乖乖到阻塞隊列排隊吧 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { /** * 進來這里,說明: * 1\. exclusiveCount(c) == 0:寫鎖沒有被占用 * 2\. readerShouldBlock() 為 true,說明阻塞隊列中有其他線程在等待 * * 既然 should block,那進來這里是干什么的呢? * 答案:是進來處理讀鎖重入的! * */ // firstReader 線程重入讀鎖,直接到下面的 CAS if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { // cachedHoldCounter 緩存的不是當前線程 // 那么到 ThreadLocal 中獲取當前線程的 HoldCounter // 如果當前線程從來沒有初始化過 ThreadLocal 中的值,get() 會執行初始化 rh = readHolds.get(); // 如果發現 count == 0,也就是說,純屬上一行代碼初始化的,那么執行 remove // 然后往下兩三行,乖乖排隊去 if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) // 排隊去。 return -1; } /** * 這塊代碼我看了蠻久才把握好它是干嘛的,原來只需要知道,它是處理重入的就可以了。 * 就是為了確保讀鎖重入操作能成功,而不是被塞到阻塞隊列中等待 * * 另一個信息就是,這里對于 ThreadLocal 變量 readHolds 的處理: * 如果 get() 后發現 count == 0,居然會做 remove() 操作, * 這行代碼對于理解其他代碼是有幫助的 */ } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { // 這里 CAS 成功,那么就意味著成功獲取讀鎖了 // 下面需要做的是設置 firstReader 或 cachedHoldCounter if (sharedCount(c) == 0) { // 如果發現 sharedCount(c) 等于 0,就將當前線程設置為 firstReader firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { // 下面這幾行,就是將 cachedHoldCounter 設置為當前線程 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; } // 返回大于 0 的數,代表獲取到了讀鎖 return 1; } } }
firstReader 是每次將讀鎖獲取次數從 0 變為 1 的那個線程。
能緩存到 firstReader 中就不要緩存到 cachedHoldCounter 中。
上面的源碼分析應該說得非常詳細了,如果到這里你不太能看懂上面的有些地方的注釋,那么可以先往后看,然后再多看幾遍。
下面我們看看讀鎖釋放的流程:
// ReadLock public void unlock() { sync.releaseShared(1); }
// Sync public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); // 這句代碼其實喚醒 獲取寫鎖的線程,往下看就知道了 return true; } return false; } // Sync protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { if (firstReaderHoldCount == 1) // 如果等于 1,那么這次解鎖后就不再持有鎖了,把 firstReader 置為 null,給后來的線程用 // 為什么不順便設置 firstReaderHoldCount = 0?因為沒必要,其他線程使用的時候自己會設值 firstReader = null; else firstReaderHoldCount--; } else { // 判斷 cachedHoldCounter 是否緩存的是當前線程,不是的話要到 ThreadLocal 中取 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { // 這一步將 ThreadLocal remove 掉,防止內存泄漏。因為已經不再持有讀鎖了 readHolds.remove(); if (count <= 0) // 就是那種,lock() 一次,unlock() 好幾次的逗比 throw unmatchedUnlockException(); } // count 減 1 --rh.count; } for (;;) { int c = getState(); // nextc 是 state 高 16 位減 1 后的值 int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // 如果 nextc == 0,那就是 state 全部 32 位都為 0,也就是讀鎖和寫鎖都空了 // 此時這里返回 true 的話,其實是幫助喚醒后繼節點中的獲取寫鎖的線程 return nextc == 0; } }
讀鎖釋放的過程還是比較簡單的,主要就是將 hold count 減 1,如果減到 0 的話,還要將 ThreadLocal 中的 remove 掉。
然后是在 for 循環中將 state 的高 16 位減 1,如果發現讀鎖和寫鎖都釋放光了,那么喚醒后繼的獲取寫鎖的線程。
寫鎖是獨占鎖。
如果有讀鎖被占用,寫鎖獲取是要進入到阻塞隊列中等待的。
// WriteLock public void lock() { sync.acquire(1); } // AQS public final void acquire(int arg) { if (!tryAcquire(arg) && // 如果 tryAcquire 失敗,那么進入到阻塞隊列等待 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // Sync protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // 看下這里返回 false 的情況: // c != 0 && w == 0: 寫鎖可用,但是有線程持有讀鎖(也可能是自己持有) // c != 0 && w !=0 && current != getExclusiveOwnerThread(): 其他線程持有寫鎖 // 也就是說,只要有讀鎖或寫鎖被占用,這次就不能獲取到寫鎖 if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 這里不需要 CAS,仔細看就知道了,能到這里的,只可能是寫鎖重入,不然在上面的 if 就攔截了 setState(c + acquires); return true; } // 如果寫鎖獲取不需要 block,那么進行 CAS,成功就代表獲取到了寫鎖 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
下面看一眼 writerShouldBlock() 的判定,然后你再回去看一篇寫鎖獲取過程。
static final class NonfairSync extends Sync { // 如果是非公平模式,那么 lock 的時候就可以直接用 CAS 去搶鎖,搶不到再排隊 final boolean writerShouldBlock() { return false; // writers can always barge } ... } static final class FairSync extends Sync { final boolean writerShouldBlock() { // 如果是公平模式,那么如果阻塞隊列有線程等待的話,就乖乖去排隊 return hasQueuedPredecessors(); } ... }
// WriteLock public void unlock() { sync.release(1); } // AQS public final boolean release(int arg) { // 1\. 釋放鎖 if (tryRelease(arg)) { // 2\. 如果獨占鎖釋放"完全",喚醒后繼節點 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // Sync // 釋放鎖,是線程安全的,因為寫鎖是獨占鎖,具有排他性 // 實現很簡單,state 減 1 就是了 protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); // 如果 exclusiveCount(nextc) == 0,也就是說包括重入的,所有的寫鎖都釋放了, // 那么返回 true,這樣會進行喚醒后繼節點的操作。 return free; }
看到這里,是不是發現寫鎖相對于讀鎖來說要簡單很多。
Doug Lea 沒有說寫鎖更高級,如果有線程持有讀鎖,那么寫鎖獲取也需要等待。
不過從源碼中也可以看出,確實會給寫鎖一些特殊照顧,如非公平模式下,為了提高吞吐量,lock 的時候會先 CAS 競爭一下,能成功就代表讀鎖獲取成功了,但是如果發現 head.next 是獲取寫鎖的線程,就不會去做 CAS 操作。
Doug Lea 將持有寫鎖的線程,去獲取讀鎖的過程稱為鎖降級(Lock downgrading)。這樣,此線程就既持有寫鎖又持有讀鎖。
但是,鎖升級是不可以的。線程持有讀鎖的話,在沒釋放的情況下不能去獲取寫鎖,因為會發生死鎖。
回去看下寫鎖獲取的源碼:
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // 看下這里返回 false 的情況: // c != 0 && w == 0: 寫鎖可用,但是有線程持有讀鎖(也可能是自己持有) // c != 0 && w !=0 && current != getExclusiveOwnerThread(): 其他線程持有寫鎖 // 也就是說,只要有讀鎖或寫鎖被占用,這次就不能獲取到寫鎖 if (w == 0 || current != getExclusiveOwnerThread()) return false; ... } ... }
仔細想想,如果線程 a 先獲取了讀鎖,然后獲取寫鎖,那么線程 a 就到阻塞隊列休眠了,自己把自己弄休眠了,而且可能之后就沒人去喚醒它了。
以上是“Java中如何實現AQS共享模式與并發工具類”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。