您好,登錄后才能下訂單哦!
本人免費整理了Java高級資料,涵蓋了Java、Redis、MongoDB、MySQL、Zookeeper、Spring Cloud、Dubbo高并發分布式等教程,一共30G,需要自己領取。
傳送門:https://mp.weixin.qq.com/s/JzddfH-7yNudmkjT0IRL8Q
在上一篇文章中我們對lock和AbstractQueuedSynchronizer(AQS)有了初步的認識。在同步組件的實現中,AQS是核心部分,同步組件的實現者通過使用AQS提供的模板方法實現同步組件語義,AQS則實現了對同步狀態的管理,以及對阻塞線程進行排隊,等待通知等等一些底層的實現處理。AQS的核心也包括了這些方面:同步隊列,獨占式鎖的獲取和釋放,共享鎖的獲取和釋放以及可中斷鎖,超時等待鎖獲取這些特性的實現,而這些實際上則是AQS提供出來的模板方法,歸納整理如下:
獨占式鎖:
void acquire(int arg):獨占式獲取同步狀態,如果獲取失敗則插入同步隊列進行等待; void acquireInterruptibly(int arg):與acquire方法相同,但在同步隊列中進行等待的時候可以檢測中斷; boolean tryAcquireNanos(int arg, long nanosTimeout):在acquireInterruptibly基礎上增加了超時等待功能,在超時時間內沒有獲得同步狀態返回false; boolean release(int arg):釋放同步狀態,該方法會喚醒在同步隊列中的下一個節點
共享式鎖:
void acquireShared(int arg):共享式獲取同步狀態,與獨占式的區別在于同一時刻有多個線程獲取同步狀態; void acquireSharedInterruptibly(int arg):在acquireShared方法基礎上增加了能響應中斷的功能; boolean tryAcquireSharedNanos(int arg, long nanosTimeout):在acquireSharedInterruptibly基礎上增加了超時等待的功能; boolean releaseShared(int arg):共享式釋放同步狀態
要想掌握AQS的底層實現,其實也就是對這些模板方法的邏輯進行學習。在學習這些模板方法之前,我們得首先了解下AQS中的同步隊列是一種什么樣的數據結構,因為同步隊列是AQS對同步狀態的管理的基石。
當共享資源被某個線程占有,其他請求該資源的線程將會阻塞,從而進入同步隊列。就數據結構而言,隊列的實現方式無外乎兩者一是通過數組的形式,另外一種則是鏈表的形式。AQS中的同步隊列則是通過鏈式方式進行實現。接下來,很顯然我們至少會抱有這樣的疑問:1. 節點的數據結構是什么樣的?2. 是單向還是雙向?3. 是帶頭結點的還是不帶頭節點的?我們依舊先是通過看源碼的方式。
在AQS有一個靜態內部類Node,其中有這樣一些屬性:
volatile int waitStatus //節點狀態 volatile Node prev //當前節點/線程的前驅節點 volatile Node next; //當前節點/線程的后繼節點 volatile Thread thread;//加入同步隊列的線程引用 Node nextWaiter;//等待隊列中的下一個節點
節點的狀態有以下這些:
int CANCELLED = 1//節點從同步隊列中取消 int SIGNAL = -1//后繼節點的線程處于等待狀態,如果當前節點釋放同步狀態會通知后繼節點,使得后繼節點的線程能夠運行; int CONDITION = -2//當前節點進入等待隊列中 int PROPAGATE = -3//表示下一次共享式同步狀態獲取將會無條件傳播下去 int INITIAL = 0;//初始狀態
現在我們知道了節點的數據結構類型,并且每個節點擁有其前驅和后繼節點,很顯然這是一個雙向隊列。同樣的我們可以用一段demo看一下。
public?class?LockDemo?{ ????private?static?ReentrantLock?lock?=?new?ReentrantLock(); ????public?static?void?main(String[]?args)?{ ????????for?(int?i?=?0;?i?<?5;?i++)?{ ????????????Thread?thread?=?new?Thread(()?->?{ ????????????????lock.lock(); ????????????????try?{ ????????????????????Thread.sleep(10000); ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????}?finally?{ ????????????????????lock.unlock(); ????????????????} ????????????}); ????????????thread.start(); ????????} ????} }
實例代碼中開啟了5個線程,先獲取鎖之后再睡眠10S中,實際上這里讓線程睡眠是想模擬出當線程無法獲取鎖時進入同步隊列的情況。通過debug,當Thread-4(在本例中最后一個線程)獲取鎖失敗后進入同步時,AQS時現在的同步隊列如圖所示:
Thread-0先獲得鎖后進行睡眠,其他線程(Thread-1,Thread-2,Thread-3,Thread-4)獲取鎖失敗進入同步隊列,同時也可以很清楚的看出來每個節點有兩個域:prev(前驅)和next(后繼),并且每個節點用來保存獲取同步狀態失敗的線程引用以及等待狀態等信息。另外AQS中有兩個重要的成員變量:
private?transient?volatile?Node?head; private?transient?volatile?Node?tail;
也就是說AQS實際上通過頭尾指針來管理同步隊列,同時實現包括獲取鎖失敗的線程進行入隊,釋放鎖時對同步隊列中的線程進行通知等核心方法。其示意圖如下:
通過對源碼的理解以及做實驗的方式,現在我們可以清楚的知道這樣幾點:
節點的數據結構,即AQS的靜態內部類Node,節點的等待狀態等信息;
同步隊列是一個雙向隊列,AQS通過持有頭尾指針管理同步隊列;
那么,節點如何進行入隊和出隊是怎樣做的了?實際上這對應著鎖的獲取和釋放兩個操作:獲取鎖失敗進行入隊操作,獲取鎖成功進行出隊操作。
我們繼續通過看源碼和debug的方式來看,還是以上面的demo為例,調用lock()方法是獲取獨占式鎖,獲取失敗就將當前線程加入同步隊列,成功則線程執行。而lock()方法實際上會調用AQS的acquire()方法,源碼如下
public?final?void?acquire(int?arg)?{ ????????//先看同步狀態是否獲取成功,如果成功則方法結束返回 ????????//若失敗則先調用addWaiter()方法再調用acquireQueued()方法 ????????if?(!tryAcquire(arg)?&& ????????????acquireQueued(addWaiter(Node.EXCLUSIVE),?arg)) ????????????selfInterrupt(); }
關鍵信息請看注釋,acquire根據當前獲得同步狀態成功與否做了兩件事情:1. 成功,則方法結束返回,2. 失敗,則先調用addWaiter()然后在調用acquireQueued()方法。
獲取同步狀態失敗,入隊操作
當線程獲取獨占式鎖失敗后就會將當前線程加入同步隊列,那么加入隊列的方式是怎樣的了?我們接下來就應該去研究一下addWaiter()和acquireQueued()。addWaiter()源碼如下:
private?Node?addWaiter(Node?mode)?{ ????????//?1\.?將當前線程構建成Node類型 ????????Node?node?=?new?Node(Thread.currentThread(),?mode); ????????//?Try?the?fast?path?of?enq;?backup?to?full?enq?on?failure ????????//?2\.?當前尾節點是否為null? ????????Node?pred?=?tail; ????????if?(pred?!=?null)?{ ????????????//?2.2?將當前節點尾插入的方式插入同步隊列中 ????????????node.prev?=?pred; ????????????if?(compareAndSetTail(pred,?node))?{ ????????????????pred.next?=?node; ????????????????return?node; ????????????} ????????} ????????//?2.1\.?當前同步隊列尾節點為null,說明當前線程是第一個加入同步隊列進行等待的線程 ????????enq(node); ????????return?node; }
分析可以看上面的注釋。程序的邏輯主要分為兩個部分:1. 當前同步隊列的尾節點為null,調用方法enq()插入;2. 當前隊列的尾節點不為null,則采用尾插入(compareAndSetTail()方法)的方式入隊。另外還會有另外一個問題:如果?if (compareAndSetTail(pred, node))
為false怎么辦?會繼續執行到enq()方法,同時很明顯compareAndSetTail是一個CAS操作,通常來說如果CAS操作失敗會繼續自旋(死循環)進行重試。因此,經過我們這樣的分析,enq()方法可能承擔兩個任務:1. 處理當前同步隊列尾節點為null時進行入隊操作;2. 如果CAS尾插入節點失敗后負責自旋進行嘗試。那么是不是真的就像我們分析的一樣了?只有源碼會告訴我們答案:),enq()源碼如下:
private?Node?enq(final?Node?node)?{ ????????for?(;;)?{ ????????????Node?t?=?tail; ????????????if?(t?==?null)?{?//?Must?initialize ????????????????//1\.?構造頭結點 ????????????????if?(compareAndSetHead(new?Node())) ????????????????????tail?=?head; ????????????}?else?{ ????????????????//?2\.?尾插入,CAS操作失敗自旋嘗試 ????????????????node.prev?=?t; ????????????????if?(compareAndSetTail(t,?node))?{ ????????????????????t.next?=?node; ????????????????????return?t; ????????????????} ????????????} ????????} }
在上面的分析中我們可以看出在第1步中會先創建頭結點,說明同步隊列是帶頭結點的鏈式存儲結構。帶頭結點與不帶頭結點相比,會在入隊和出隊的操作中獲得更大的便捷性,因此同步隊列選擇了帶頭結點的鏈式存儲結構。那么帶頭節點的隊列初始化時機是什么?自然而然是在tail為null時,即當前線程是第一次插入同步隊列。compareAndSetTail(t, node)方法會利用CAS操作設置尾節點,如果CAS操作失敗會在for (;;)
for死循環中不斷嘗試,直至成功return返回為止。因此,對enq()方法可以做這樣的總結:
在當前線程是第一個加入同步隊列時,調用compareAndSetHead(new Node())方法,完成鏈式隊列的頭結點的初始化;
自旋不斷嘗試CAS尾插入節點直至成功為止。
現在我們已經很清楚獲取獨占式鎖失敗的線程包裝成Node然后插入同步隊列的過程了?那么緊接著會有下一個問題?在同步隊列中的節點(線程)會做什么事情了來保證自己能夠有機會獲得獨占式鎖了?帶著這樣的問題我們就來看看acquireQueued()方法,從方法名就可以很清楚,這個方法的作用就是排隊獲取鎖的過程,源碼如下:
final?boolean?acquireQueued(final?Node?node,?int?arg)?{ ????????boolean?failed?=?true; ????????try?{ ????????????boolean?interrupted?=?false; ????????????for?(;;)?{ ????????????????//?1\.?獲得當前節點的先驅節點 ????????????????final?Node?p?=?node.predecessor(); ????????????????//?2\.?當前節點能否獲取獨占式鎖????????????????? ????????????????//?2.1?如果當前節點的先驅節點是頭結點并且成功獲取同步狀態,即可以獲得獨占式鎖 ????????????????if?(p?==?head?&&?tryAcquire(arg))?{ ????????????????????//隊列頭指針用指向當前節點 ????????????????????setHead(node); ????????????????????//釋放前驅節點 ????????????????????p.next?=?null;?//?help?GC ????????????????????failed?=?false; ????????????????????return?interrupted; ????????????????} ????????????????//?2.2?獲取鎖失敗,線程進入等待狀態等待獲取獨占式鎖 ????????????????if?(shouldParkAfterFailedAcquire(p,?node)?&& ????????????????????parkAndCheckInterrupt()) ????????????????????interrupted?=?true; ????????????} ????????}?finally?{ ????????????if?(failed) ????????????????cancelAcquire(node); ????????} }
程序邏輯通過注釋已經標出,整體來看這是一個這又是一個自旋的過程(for (;;)),代碼首先獲取當前節點的先驅節點,如果先驅節點是頭結點的并且成功獲得同步狀態的時候(if (p == head && tryAcquire(arg))),當前節點所指向的線程能夠獲取鎖。反之,獲取鎖失敗進入等待狀態。整體示意圖為下圖:
獲取鎖成功,出隊操作
獲取鎖的節點出隊的邏輯是:
//隊列頭結點引用指向當前節點 setHead(node); //釋放前驅節點 p.next?=?null;?//?help?GC failed?=?false; return?interrupted;
setHead()方法為:
private?void?setHead(Node?node)?{ ????????head?=?node; ????????node.thread?=?null; ????????node.prev?=?null; }
將當前節點通過setHead()方法設置為隊列的頭結點,然后將之前的頭結點的next域設置為null并且pre域也為null,即與隊列斷開,無任何引用方便GC時能夠將內存進行回收。示意圖如下:
那么當獲取鎖失敗的時候會調用shouldParkAfterFailedAcquire()方法和parkAndCheckInterrupt()方法,看看他們做了什么事情。shouldParkAfterFailedAcquire()方法源碼為:
private?static?boolean?shouldParkAfterFailedAcquire(Node?pred,?Node?node)?{ ????int?ws?=?pred.waitStatus; ????if?(ws?==?Node.SIGNAL) ????????/* ?????????*?This?node?has?already?set?status?asking?a?release ?????????*?to?signal?it,?so?it?can?safely?park. ?????????*/ ????????return?true; ????if?(ws?>?0)?{ ????????/* ?????????*?Predecessor?was?cancelled.?Skip?over?predecessors?and ?????????*?indicate?retry. ?????????*/ ????????do?{ ????????????node.prev?=?pred?=?pred.prev; ????????}?while?(pred.waitStatus?>?0); ????????pred.next?=?node; ????}?else?{ ????????/* ?????????*?waitStatus?must?be?0?or?PROPAGATE.??Indicate?that?we ?????????*?need?a?signal,?but?don't?park?yet.??Caller?will?need?to ?????????*?retry?to?make?sure?it?cannot?acquire?before?parking. ?????????*/ ????????compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL); ????} ????return?false; }
shouldParkAfterFailedAcquire()方法主要邏輯是使用compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
使用CAS將節點狀態由INITIAL設置成SIGNAL,表示當前線程阻塞。當compareAndSetWaitStatus設置失敗則說明shouldParkAfterFailedAcquire方法返回false,然后會在acquireQueued()方法中for (;;)死循環中會繼續重試,直至compareAndSetWaitStatus設置節點狀態位為SIGNAL時shouldParkAfterFailedAcquire返回true時才會執行方法parkAndCheckInterrupt()方法,該方法的源碼為:
private?final?boolean?parkAndCheckInterrupt()?{ ????????//使得該線程阻塞 ????????LockSupport.park(this); ????????return?Thread.interrupted(); }
該方法的關鍵是會調用LookSupport.park()方法(關于LookSupport會在以后的文章進行討論),該方法是用來阻塞當前線程的。因此到這里就應該清楚了,acquireQueued()在自旋過程中主要完成了兩件事情:
如果當前節點的前驅節點是頭節點,并且能夠獲得同步狀態的話,當前線程能夠獲得鎖該方法執行結束退出;
獲取鎖失敗的話,先將節點狀態設置成SIGNAL,然后調用LookSupport.park方法使得當前線程阻塞。
經過上面的分析,獨占式鎖的獲取過程也就是acquire()方法的執行流程如下圖所示:
獨占鎖的釋放就相對來說比較容易理解了,廢話不多說先來看下源碼:
public?final?boolean?release(int?arg)?{ ????????if?(tryRelease(arg))?{ ????????????Node?h?=?head; ????????????if?(h?!=?null?&&?h.waitStatus?!=?0) ????????????????unparkSuccessor(h); ????????????return?true; ????????} ????????return?false; }
這段代碼邏輯就比較容易理解了,如果同步狀態釋放成功(tryRelease返回true)則會執行if塊中的代碼,當head指向的頭結點不為null,并且該節點的狀態值不為0的話才會執行unparkSuccessor()方法。unparkSuccessor方法源碼:
private?void?unparkSuccessor(Node?node)?{ ????/* ?????*?If?status?is?negative?(i.e.,?possibly?needing?signal)?try ?????*?to?clear?in?anticipation?of?signalling.??It?is?OK?if?this ?????*?fails?or?if?status?is?changed?by?waiting?thread. ?????*/ ????int?ws?=?node.waitStatus; ????if?(ws?<?0) ????????compareAndSetWaitStatus(node,?ws,?0); ????/* ?????*?Thread?to?unpark?is?held?in?successor,?which?is?normally ?????*?just?the?next?node.??But?if?cancelled?or?apparently?null, ?????*?traverse?backwards?from?tail?to?find?the?actual ?????*?non-cancelled?successor. ?????*/ ????//頭節點的后繼節點 ????Node?s?=?node.next; ????if?(s?==?null?||?s.waitStatus?>?0)?{ ????????s?=?null; ????????for?(Node?t?=?tail;?t?!=?null?&&?t?!=?node;?t?=?t.prev) ????????????if?(t.waitStatus?<=?0) ????????????????s?=?t; ????} ????if?(s?!=?null) ????????//后繼節點不為null時喚醒該線程 ????????LockSupport.unpark(s.thread); }
源碼的關鍵信息請看注釋,首先獲取頭節點的后繼節點,當后繼節點的時候會調用LookSupport.unpark()方法,該方法會喚醒該節點的后繼節點所包裝的線程。因此,每一次鎖釋放后就會喚醒隊列中該節點的后繼節點所引用的線程,從而進一步可以佐證獲得鎖的過程是一個FIFO(先進先出)的過程。
到現在我們終于啃下了一塊硬骨頭了,通過學習源碼的方式非常深刻的學習到了獨占式鎖的獲取和釋放的過程以及同步隊列。可以做一下總結:
線程獲取鎖失敗,線程被封裝成Node進行入隊操作,核心方法在于addWaiter()和enq(),同時enq()完成對同步隊列的頭結點初始化工作以及CAS操作失敗的重試;
線程獲取鎖是一個自旋的過程,當且僅當 當前節點的前驅節點是頭結點并且成功獲得同步狀態時,節點出隊即該節點引用的線程獲得鎖,否則,當不滿足條件時就會調用LookSupport.park()方法使得線程阻塞;
釋放鎖的時候會喚醒后繼節點;
總體來說:在獲取同步狀態時,AQS維護一個同步隊列,獲取同步狀態失敗的線程會加入到隊列中進行自旋;移除隊列(或停止自旋)的條件是前驅節點是頭結點并且成功獲得了同步狀態。在釋放同步狀態時,同步器會調用unparkSuccessor()方法喚醒后繼節點。
獨占鎖特性學習
我們知道lock相較于synchronized有一些更方便的特性,比如能響應中斷以及超時等待等特性,現在我們依舊采用通過學習源碼的方式來看看能夠響應中斷是怎么實現的。可響應中斷式鎖可調用方法lock.lockInterruptibly();而該方法其底層會調用AQS的acquireInterruptibly方法,源碼為:
public?final?void?acquireInterruptibly(int?arg) ????????throws?InterruptedException?{ ????if?(Thread.interrupted()) ????????throw?new?InterruptedException(); ????if?(!tryAcquire(arg)) ????????//線程獲取鎖失敗 ????????doAcquireInterruptibly(arg); }
在獲取同步狀態失敗后就會調用doAcquireInterruptibly方法:
private?void?doAcquireInterruptibly(int?arg) ????throws?InterruptedException?{ ????//將節點插入到同步隊列中 ????final?Node?node?=?addWaiter(Node.EXCLUSIVE); ????boolean?failed?=?true; ????try?{ ????????for?(;;)?{ ????????????final?Node?p?=?node.predecessor(); ????????????//獲取鎖出隊 ????????????if?(p?==?head?&&?tryAcquire(arg))?{ ????????????????setHead(node); ????????????????p.next?=?null;?//?help?GC ????????????????failed?=?false; ????????????????return; ????????????} ????????????if?(shouldParkAfterFailedAcquire(p,?node)?&& ????????????????parkAndCheckInterrupt()) ????????????????//線程中斷拋異常 ????????????????throw?new?InterruptedException(); ????????} ????}?finally?{ ????????if?(failed) ????????????cancelAcquire(node); ????} }
關鍵信息請看注釋,現在看這段代碼就很輕松了吧:),與acquire方法邏輯幾乎一致,唯一的區別是當parkAndCheckInterrupt返回true時即線程阻塞時該線程被中斷,代碼拋出被中斷異常。
通過調用lock.tryLock(timeout,TimeUnit)方式達到超時等待獲取鎖的效果,該方法會在三種情況下才會返回:
在超時時間內,當前線程成功獲取了鎖;
當前線程在超時時間內被中斷;
超時時間結束,仍未獲得鎖返回false。
我們仍然通過采取閱讀源碼的方式來學習底層具體是怎么實現的,該方法會調用AQS的方法tryAcquireNanos(),源碼為:
public?final?boolean?tryAcquireNanos(int?arg,?long?nanosTimeout) ????????throws?InterruptedException?{ ????if?(Thread.interrupted()) ????????throw?new?InterruptedException(); ????return?tryAcquire(arg)?|| ????????//實現超時等待的效果 ????????doAcquireNanos(arg,?nanosTimeout); }
很顯然這段源碼最終是靠doAcquireNanos方法實現超時等待的效果,該方法源碼如下:
private?boolean?doAcquireNanos(int?arg,?long?nanosTimeout) ????????throws?InterruptedException?{ ????if?(nanosTimeout?<=?0L) ????????return?false; ????//1\.?根據超時時間和當前時間計算出截止時間 ????final?long?deadline?=?System.nanoTime()?+?nanosTimeout; ????final?Node?node?=?addWaiter(Node.EXCLUSIVE); ????boolean?failed?=?true; ????try?{ ????????for?(;;)?{ ????????????final?Node?p?=?node.predecessor(); ????????????//2\.?當前線程獲得鎖出隊列 ????????????if?(p?==?head?&&?tryAcquire(arg))?{ ????????????????setHead(node); ????????????????p.next?=?null;?//?help?GC ????????????????failed?=?false; ????????????????return?true; ????????????} ????????????//?3.1?重新計算超時時間 ????????????nanosTimeout?=?deadline?-?System.nanoTime(); ????????????//?3.2?已經超時返回false ????????????if?(nanosTimeout?<=?0L) ????????????????return?false; ????????????//?3.3?線程阻塞等待? ????????????if?(shouldParkAfterFailedAcquire(p,?node)?&& ????????????????nanosTimeout?>?spinForTimeoutThreshold) ????????????????LockSupport.parkNanos(this,?nanosTimeout); ????????????//?3.4?線程被中斷拋出被中斷異常 ????????????if?(Thread.interrupted()) ????????????????throw?new?InterruptedException(); ????????} ????}?finally?{ ????????if?(failed) ????????????cancelAcquire(node); ????} }
程序邏輯如圖所示:
程序邏輯同獨占鎖可響應中斷式獲取基本一致,唯一的不同在于獲取鎖失敗后,對超時時間的處理上,在第1步會先計算出按照現在時間和超時時間計算出理論上的截止時間,比如當前時間是8h20min,超時時間是10min,那么根據deadline = System.nanoTime() + nanosTimeout
計算出剛好達到超時時間時的系統時間就是8h 10min+10min = 8h 20min。然后根據deadline - System.nanoTime()
就可以判斷是否已經超時了,比如,當前系統時間是8h 30min很明顯已經超過了理論上的系統時間8h 20min,deadline - System.nanoTime()
計算出來就是一個負數,自然而然會在3.2步中的If判斷之間返回false。如果還沒有超時即3.2步中的if判斷為true時就會繼續執行3.3步通過LockSupport.parkNanos使得當前線程阻塞,同時在3.4步增加了對中斷的檢測,若檢測出被中斷直接拋出被中斷異常。
在聊完AQS對獨占鎖的實現后,我們繼續一鼓作氣的來看看共享鎖是怎樣實現的?共享鎖的獲取方法為acquireShared,源碼為:
public?final?void?acquireShared(int?arg)?{ ????if?(tryAcquireShared(arg)?<?0) ????????doAcquireShared(arg); }
這段源碼的邏輯很容易理解,在該方法中會首先調用tryAcquireShared方法,tryAcquireShared返回值是一個int類型,當返回值為大于等于0的時候方法結束說明獲得成功獲取鎖,否則,表明獲取同步狀態失敗即所引用的線程獲取鎖失敗,會執行doAcquireShared方法,該方法的源碼為:
private?void?doAcquireShared(int?arg)?{ ????final?Node?node?=?addWaiter(Node.SHARED); ????boolean?failed?=?true; ????try?{ ????????boolean?interrupted?=?false; ????????for?(;;)?{ ????????????final?Node?p?=?node.predecessor(); ????????????if?(p?==?head)?{ ????????????????int?r?=?tryAcquireShared(arg); ????????????????if?(r?>=?0)?{ ????????????????????//?當該節點的前驅節點是頭結點且成功獲取同步狀態 ????????????????????setHeadAndPropagate(node,?r); ????????????????????p.next?=?null;?//?help?GC ????????????????????if?(interrupted) ????????????????????????selfInterrupt(); ????????????????????failed?=?false; ????????????????????return; ????????????????} ????????????} ????????????if?(shouldParkAfterFailedAcquire(p,?node)?&& ????????????????parkAndCheckInterrupt()) ????????????????interrupted?=?true; ????????} ????}?finally?{ ????????if?(failed) ????????????cancelAcquire(node); ????} }
現在來看這段代碼會不會很容易了?邏輯幾乎和獨占式鎖的獲取一模一樣,這里的自旋過程中能夠退出的條件是當前節點的前驅節點是頭結點并且tryAcquireShared(arg)返回值大于等于0即能成功獲得同步狀態。
共享鎖的釋放在AQS中會調用方法releaseShared:
public?final?boolean?releaseShared(int?arg)?{ ????if?(tryReleaseShared(arg))?{ ????????doReleaseShared(); ????????return?true; ????} ????return?false; }
當成功釋放同步狀態之后即tryReleaseShared會繼續執行doReleaseShared方法:
private?void?doReleaseShared()?{ ????/* ?????*?Ensure?that?a?release?propagates,?even?if?there?are?other ?????*?in-progress?acquires/releases.??This?proceeds?in?the?usual ?????*?way?of?trying?to?unparkSuccessor?of?head?if?it?needs ?????*?signal.?But?if?it?does?not,?status?is?set?to?PROPAGATE?to ?????*?ensure?that?upon?release,?propagation?continues. ?????*?Additionally,?we?must?loop?in?case?a?new?node?is?added ?????*?while?we?are?doing?this.?Also,?unlike?other?uses?of ?????*?unparkSuccessor,?we?need?to?know?if?CAS?to?reset?status ?????*?fails,?if?so?rechecking. ?????*/ ????for?(;;)?{ ????????Node?h?=?head; ????????if?(h?!=?null?&&?h?!=?tail)?{ ????????????int?ws?=?h.waitStatus; ????????????if?(ws?==?Node.SIGNAL)?{ ????????????????if?(!compareAndSetWaitStatus(h,?Node.SIGNAL,?0)) ????????????????????continue;????????????//?loop?to?recheck?cases ????????????????unparkSuccessor(h); ????????????} ????????????else?if?(ws?==?0?&& ?????????????????????!compareAndSetWaitStatus(h,?0,?Node.PROPAGATE)) ????????????????continue;????????????????//?loop?on?failed?CAS ????????} ????????if?(h?==?head)???????????????????//?loop?if?head?changed ????????????break; ????} }
這段方法跟獨占式鎖釋放過程有點點不同,在共享式鎖的釋放過程中,對于能夠支持多個線程同時訪問的并發組件,必須保證多個線程能夠安全的釋放同步狀態,這里采用的CAS保證,當CAS操作失敗continue,在下一次循環中進行重試。
關于可中斷鎖以及超時等待的特性其實現和獨占式鎖可中斷獲取鎖以及超時等待的實現幾乎一致,具體的就不再說了,如果理解了上面的內容對這部分的理解也是水到渠成的。
通過這篇,加深了對AQS的底層實現更加清楚了,也對了解并發組件的實現原理打下了基礎,學無止境,繼續加油:);如果覺得不錯,請給贊,嘿嘿。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。