您好,登錄后才能下訂單哦!
本文為死磕Synchronized底層實現第三篇文章,內容為重量級鎖實現。
本系列文章將對HotSpot的synchronized
鎖實現進行全面分析,內容包括偏向鎖、輕量級鎖、重量級鎖的加鎖、解鎖、鎖升級流程的原理及源碼分析,希望給在研究synchronized
路上的同學一些幫助。
?
當出現多個線程同時競爭鎖時,會進入到synchronizer.cpp#slow_enter
方法
void?ObjectSynchronizer::slow_enter(Handle?obj,?BasicLock*?lock,?TRAPS)?{ ??markOop?mark?=?obj->mark(); ??assert(!mark->has_bias_pattern(),?"should?not?see?bias?pattern?here"); ??//?如果是無鎖狀態 ??if?(mark->is_neutral())?{ ????lock->set_displaced_header(mark); ????if?(mark?==?(markOop)?Atomic::cmpxchg_ptr(lock,?obj()->mark_addr(),?mark))?{ ??????TEVENT?(slow_enter:?release?stacklock)?; ??????return?; ????} ????//?Fall?through?to?inflate()?... ??}?else ??//?如果是輕量級鎖重入 ??if?(mark->has_locker()?&&?THREAD->is_lock_owned((address)mark->locker()))?{ ????assert(lock?!=?mark->locker(),?"must?not?re-lock?the?same?lock"); ????assert(lock?!=?(BasicLock*)obj->mark(),?"don't?relock?with?same?BasicLock"); ????lock->set_displaced_header(NULL); ????return; ??} ?... ? ??//?這時候需要膨脹為重量級鎖,膨脹前,設置Displaced?Mark?Word為一個特殊值,代表該鎖正在用一個重量級鎖的monitor ??lock->set_displaced_header(markOopDesc::unused_mark()); ??//先調用inflate膨脹為重量級鎖,該方法返回一個ObjectMonitor對象,然后調用其enter方法 ??ObjectSynchronizer::inflate(THREAD,?obj())->enter(THREAD); }
在inflate
中完成膨脹過程。
ObjectMonitor?*?ATTR?ObjectSynchronizer::inflate?(Thread?*?Self,?oop?object)?{ ??... ??for?(;;)?{ ??????const?markOop?mark?=?object->mark()?; ??????assert?(!mark->has_bias_pattern(),?"invariant")?; ???? ??????//?mark是以下狀態中的一種: ??????//?*??Inflated(重量級鎖狀態)?????-?直接返回 ??????//?*??Stack-locked(輕量級鎖狀態)?-?膨脹 ??????//?*??INFLATING(膨脹中)????-?忙等待直到膨脹完成 ??????//?*??Neutral(無鎖狀態)??????-?膨脹 ??????//?*??BIASED(偏向鎖)???????-?非法狀態,在這里不會出現 ??????//?CASE:?inflated ??????if?(mark->has_monitor())?{ ??????????//?已經是重量級鎖狀態了,直接返回 ??????????ObjectMonitor?*?inf?=?mark->monitor()?; ??????????... ??????????return?inf?; ??????} ??????//?CASE:?inflation?in?progress ??????if?(mark?==?markOopDesc::INFLATING())?{ ?????????//?正在膨脹中,說明另一個線程正在進行鎖膨脹,continue重試 ?????????TEVENT?(Inflate:?spin?while?INFLATING)?; ?????????//?在該方法中會進行spin/yield/park等操作完成自旋動作? ?????????ReadStableMark(object)?; ?????????continue?; ??????} ? ??????if?(mark->has_locker())?{ ??????????//?當前輕量級鎖狀態,先分配一個ObjectMonitor對象,并初始化值 ??????????ObjectMonitor?*?m?=?omAlloc?(Self)?; ?????????? ??????????m->Recycle(); ??????????m->_Responsible??=?NULL?; ??????????m->OwnerIsThread?=?0?; ??????????m->_recursions???=?0?; ??????????m->_SpinDuration?=?ObjectMonitor::Knob_SpinLimit?;???//?Consider:?maintain?by?type/class ??//?將鎖對象的mark?word設置為INFLATING?(0)狀態? ??????????markOop?cmp?=?(markOop)?Atomic::cmpxchg_ptr?(markOopDesc::INFLATING(),?object->mark_addr(),?mark)?; ??????????if?(cmp?!=?mark)?{ ?????????????omRelease?(Self,?m,?true)?; ?????????????continue?;???????//?Interference?--?just?retry ??????????} ??????????//?棧中的displaced?mark?word ??????????markOop?dmw?=?mark->displaced_mark_helper()?; ??????????assert?(dmw->is_neutral(),?"invariant")?; ??????????//?設置monitor的字段 ??????????m->set_header(dmw)?; ??????????//?owner為Lock?Record ??????????m->set_owner(mark->locker()); ??????????m->set_object(object); ??????????... ??????????//?將鎖對象頭設置為重量級鎖狀態 ??????????object->release_set_mark(markOopDesc::encode(m)); ?????????... ??????????return?m?; ??????} ??????//?CASE:?neutral ?? ? ??????//?分配以及初始化ObjectMonitor對象 ??????ObjectMonitor?*?m?=?omAlloc?(Self)?; ??????//?prepare?m?for?installation?-?set?monitor?to?initial?state ??????m->Recycle(); ??????m->set_header(mark); ??????//?owner為NULL ??????m->set_owner(NULL); ??????m->set_object(object); ??????m->OwnerIsThread?=?1?; ??????m->_recursions???=?0?; ??????m->_Responsible??=?NULL?; ??????m->_SpinDuration?=?ObjectMonitor::Knob_SpinLimit?;???????//?consider:?keep?metastats?by?type/class ??//?用CAS替換對象頭的mark?word為重量級鎖狀態 ??????if?(Atomic::cmpxchg_ptr?(markOopDesc::encode(m),?object->mark_addr(),?mark)?!=?mark)?{ ??????????//?不成功說明有另外一個線程在執行inflate,釋放monitor對象 ??????????m->set_object?(NULL)?; ??????????m->set_owner??(NULL)?; ??????????m->OwnerIsThread?=?0?; ??????????m->Recycle()?; ??????????omRelease?(Self,?m,?true)?; ??????????m?=?NULL?; ??????????continue?; ??????????//?interference?-?the?markword?changed?-?just?retry. ??????????//?The?state-transitions?are?one-way,?so?there's?no?chance?of ??????????//?live-lock?--?"Inflated"?is?an?absorbing?state. ??????} ??????... ??????return?m?; ??} }
inflate
中是一個for循環,主要是為了處理多線程同時調用inflate的情況。然后會根據鎖對象的狀態進行不同的處理:
1.已經是重量級狀態,說明膨脹已經完成,直接返回
2.如果是輕量級鎖則需要進行膨脹操作
3.如果是膨脹中狀態,則進行忙等待
4.如果是無鎖狀態則需要進行膨脹操作
其中輕量級鎖和無鎖狀態需要進行膨脹操作,輕量級鎖膨脹流程如下:
1.調用omAlloc
分配一個ObjectMonitor
對象(以下簡稱monitor),在omAlloc
方法中會先從線程私有的monitor
集合omFreeList
中分配對象,如果omFreeList
中已經沒有monitor
對象,則從JVM全局的gFreeList
中分配一批monitor
到omFreeList
中。
2.初始化monitor
對象
3.將狀態設置為膨脹中(INFLATING)狀態
4.設置monitor
的header字段為displaced mark word
,owner字段為Lock Record
,obj字段為鎖對象
5.設置鎖對象頭的mark word
為重量級鎖狀態,指向第一步分配的monitor
對象
無鎖狀態下的膨脹流程如下:
1.調用omAlloc
分配一個ObjectMonitor
對象(以下簡稱monitor)
2.初始化monitor
對象
3.設置monitor
的header字段為?mark word
,owner字段為null
,obj字段為鎖對象
4.設置鎖對象頭的mark word
為重量級鎖狀態,指向第一步分配的monitor
對象
至于為什么輕量級鎖需要一個膨脹中(INFLATING)狀態,代碼中的注釋是:
//?Why?do?we?CAS?a?0?into?the?mark-word?instead?of?just?CASing?the //?mark-word?from?the?stack-locked?value?directly?to?the?new?inflated?state? //?Consider?what?happens?when?a?thread?unlocks?a?stack-locked?object. //?It?attempts?to?use?CAS?to?swing?the?displaced?header?value?from?the //?on-stack?basiclock?back?into?the?object?header.??Recall?also?that?the //?header?value?(hashcode,?etc)?can?reside?in?(a)?the?object?header,?or //?(b)?a?displaced?header?associated?with?the?stack-lock,?or?(c)?a?displaced //?header?in?an?objectMonitor.??The?inflate()?routine?must?copy?the?header //?value?from?the?basiclock?on?the?owner's?stack?to?the?objectMonitor,?all //?the?while?preserving?the?hashCode?stability?invariants.??If?the?owner //?decides?to?release?the?lock?while?the?value?is?0,?the?unlock?will?fail //?and?control?will?eventually?pass?from?slow_exit()?to?inflate.??The?owner //?will?then?spin,?waiting?for?the?0?value?to?disappear.???Put?another?way, //?the?0?causes?the?owner?to?stall?if?the?owner?happens?to?try?to //?drop?the?lock?(restoring?the?header?from?the?basiclock?to?the?object) //?while?inflation?is?in-progress.??This?protocol?avoids?races?that?might //?would?otherwise?permit?hashCode?values?to?change?or?"flicker"?for?an?object. //?Critically,?while?object->mark?is?0?mark->displaced_mark_helper()?is?stable. //?0?serves?as?a?"BUSY"?inflate-in-progress?indicator.
我沒太看懂,有知道的同學可以指點下~
膨脹完成之后,會調用enter
方法獲得鎖
void?ATTR?ObjectMonitor::enter(TRAPS)?{ ??? ??Thread?*?const?Self?=?THREAD?; ??void?*?cur?; ??//?owner為null代表無鎖狀態,如果能CAS設置成功,則當前線程直接獲得鎖 ??cur?=?Atomic::cmpxchg_ptr?(Self,?&_owner,?NULL)?; ??if?(cur?==?NULL)?{ ?????... ?????return?; ??} ??//?如果是重入的情況 ??if?(cur?==?Self)?{ ?????//?TODO-FIXME:?check?for?integer?overflow!??BUGID?6557169. ?????_recursions?++?; ?????return?; ??} ??//?當前線程是之前持有輕量級鎖的線程。由輕量級鎖膨脹且第一次調用enter方法,那cur是指向Lock?Record的指針 ??if?(Self->is_lock_owned?((address)cur))?{ ????assert?(_recursions?==?0,?"internal?state?error"); ????//?重入計數重置為1 ????_recursions?=?1?; ????//?設置owner字段為當前線程(之前owner是指向Lock?Record的指針) ????_owner?=?Self?; ????OwnerIsThread?=?1?; ????return?; ??} ??... ??//?在調用系統的同步操作之前,先嘗試自旋獲得鎖 ??if?(Knob_SpinEarly?&&?TrySpin?(Self)?>?0)?{ ?????... ?????//自旋的過程中獲得了鎖,則直接返回 ?????Self->_Stalled?=?0?; ?????return?; ??} ??... ??{? ????... ????for?(;;)?{ ??????jt->set_suspend_equivalent(); ??????//?在該方法中調用系統同步操作 ??????EnterI?(THREAD)?; ??????... ????} ????Self->set_current_pending_monitor(NULL); ???? ??} ??... }
如果當前是無鎖狀態、鎖重入、當前線程是之前持有輕量級鎖的線程則進行簡單操作后返回。
先自旋嘗試獲得鎖,這樣做的目的是為了減少執行操作系統同步操作帶來的開銷
調用EnterI
方法獲得鎖或阻塞
EnterI
方法比較長,在看之前,我們先闡述下其大致原理:
一個ObjectMonitor
對象包括這么幾個關鍵字段:cxq(下圖中的ContentionList),EntryList ,WaitSet,owner。
其中cxq ,EntryList ,WaitSet都是由ObjectWaiter的鏈表結構,owner指向持有鎖的線程。
當一個線程嘗試獲得鎖時,如果該鎖已經被占用,則會將該線程封裝成一個ObjectWaiter
對象插入到cxq的隊列的隊首,然后調用park
函數掛起當前線程。在linux系統上,park
函數底層調用的是gclib庫的pthread_cond_wait
,JDK的ReentrantLock
底層也是用該方法掛起線程的。更多細節可以看我之前的兩篇文章:關于同步的一點思考-下,linux內核級同步機制–futex
當線程釋放鎖時,會從cxq或EntryList中挑選一個線程喚醒,被選中的線程叫做Heir presumptive
即假定繼承人(應該是這樣翻譯),就是圖中的Ready Thread
,假定繼承人被喚醒后會嘗試獲得鎖,但synchronized
是非公平的,所以假定繼承人不一定能獲得鎖(這也是它叫”假定”繼承人的原因)。
如果線程獲得鎖后調用Object#wait
方法,則會將線程加入到WaitSet中,當被Object#notify
喚醒后,會將線程從WaitSet移動到cxq或EntryList中去。需要注意的是,當調用一個鎖對象的wait
或notify
方法時,如當前鎖的狀態是偏向鎖或輕量級鎖則會先膨脹成重量級鎖。
synchronized
的monitor
鎖機制和JDK的ReentrantLock
與Condition
是很相似的,ReentrantLock
也有一個存放等待獲取鎖線程的鏈表,Condition
也有一個類似WaitSet
的集合用來存放調用了await
的線程。如果你之前對ReentrantLock
有深入了解,那理解起monitor
應該是很簡單。
回到代碼上,開始分析EnterI
方法:
void?ATTR?ObjectMonitor::EnterI?(TRAPS)?{ ????Thread?*?Self?=?THREAD?; ????... ????//?嘗試獲得鎖 ????if?(TryLock?(Self)?>?0)?{ ????????... ????????return?; ????} ????DeferredInitialize?()?; ? //?自旋 ????if?(TrySpin?(Self)?>?0)?{ ????????... ????????return?; ????} ???? ????... ????//?將線程封裝成node節點中 ????ObjectWaiter?node(Self)?; ????Self->_ParkEvent->reset()?; ????node._prev???=?(ObjectWaiter?*)?0xBAD?; ????node.TState??=?ObjectWaiter::TS_CXQ?; ????//?將node節點插入到_cxq隊列的頭部,cxq是一個單向鏈表 ????ObjectWaiter?*?nxt?; ????for?(;;)?{ ????????node._next?=?nxt?=?_cxq?; ????????if?(Atomic::cmpxchg_ptr?(&node,?&_cxq,?nxt)?==?nxt)?break?; ????????//?CAS失敗的話?再嘗試獲得鎖,這樣可以降低插入到_cxq隊列的頻率 ????????if?(TryLock?(Self)?>?0)?{ ????????????... ????????????return?; ????????} ????} //?SyncFlags默認為0,如果沒有其他等待的線程,則將_Responsible設置為自己 ????if?((SyncFlags?&?16)?==?0?&&?nxt?==?NULL?&&?_EntryList?==?NULL)?{ ????????Atomic::cmpxchg_ptr?(Self,?&_Responsible,?NULL)?; ????} ????TEVENT?(Inflated?enter?-?Contention)?; ????int?nWakeups?=?0?; ????int?RecheckInterval?=?1?; ????for?(;;)?{ ????????if?(TryLock?(Self)?>?0)?break?; ????????assert?(_owner?!=?Self,?"invariant")?; ????????... ????????//?park?self ????????if?(_Responsible?==?Self?||?(SyncFlags?&?1))?{ ????????????//?當前線程是_Responsible時,調用的是帶時間參數的park ????????????TEVENT?(Inflated?enter?-?park?TIMED)?; ????????????Self->_ParkEvent->park?((jlong)?RecheckInterval)?; ????????????//?Increase?the?RecheckInterval,?but?clamp?the?value. ????????????RecheckInterval?*=?8?; ????????????if?(RecheckInterval?>?1000)?RecheckInterval?=?1000?; ????????}?else?{ ????????????//否則直接調用park掛起當前線程 ????????????TEVENT?(Inflated?enter?-?park?UNTIMED)?; ????????????Self->_ParkEvent->park()?; ????????} ????????if?(TryLock(Self)?>?0)?break?; ????????... ???????? ????????if?((Knob_SpinAfterFutile?&?1)?&&?TrySpin?(Self)?>?0)?break?; ??????? ... ????????//?在釋放鎖時,_succ會被設置為EntryList或_cxq中的一個線程 ????????if?(_succ?==?Self)?_succ?=?NULL?; ????????//?Invariant:?after?clearing?_succ?a?thread?*must*?retry?_owner?before?parking. ????????OrderAccess::fence()?; ????} ???//?走到這里說明已經獲得鎖了 ????assert?(_owner?==?Self??????,?"invariant")?; ????assert?(object()?!=?NULL????,?"invariant")?; ?? //?將當前線程的node從cxq或EntryList中移除 ????UnlinkAfterAcquire?(Self,?&node)?; ????if?(_succ?==?Self)?_succ?=?NULL?; if?(_Responsible?==?Self)?{ ????????_Responsible?=?NULL?; ????????OrderAccess::fence(); ????} ????... ????return?; }
主要步驟有3步:
將當前線程插入到cxq隊列的隊首
然后park當前線程
當被喚醒后再嘗試獲得鎖
這里需要特別說明的是_Responsible
和_succ
兩個字段的作用:
當競爭發生時,選取一個線程作為_Responsible
,_Responsible
線程調用的是有時間限制的park
方法,其目的是防止出現擱淺
現象。
_succ
線程是在線程釋放鎖是被設置,其含義是Heir presumptive
,也就是我們上面說的假定繼承人。
重量級鎖釋放的代碼在ObjectMonitor::exit
:
void?ATTR?ObjectMonitor::exit(bool?not_suspended,?TRAPS)?{ ???Thread?*?Self?=?THREAD?; ???//?如果_owner不是當前線程 ???if?(THREAD?!=?_owner)?{ ?????//?當前線程是之前持有輕量級鎖的線程。由輕量級鎖膨脹后還沒調用過enter方法,_owner會是指向Lock?Record的指針。 ?????if?(THREAD->is_lock_owned((address)?_owner))?{ ???????assert?(_recursions?==?0,?"invariant")?; ???????_owner?=?THREAD?; ???????_recursions?=?0?; ???????OwnerIsThread?=?1?; ?????}?else?{ ???????//?異常情況:當前不是持有鎖的線程 ???????TEVENT?(Exit?-?Throw?IMSX)?; ???????assert(false,?"Non-balanced?monitor?enter/exit!"); ???????if?(false)?{ ??????????THROW(vmSymbols::java_lang_IllegalMonitorStateException()); ???????} ???????return; ?????} ???} ???//?重入計數器還不為0,則計數器-1后返回 ???if?(_recursions?!=?0)?{ ?????_recursions--;????????//?this?is?simple?recursive?enter ?????TEVENT?(Inflated?exit?-?recursive)?; ?????return?; ???} ???//?_Responsible設置為null ???if?((SyncFlags?&?4)?==?0)?{ ??????_Responsible?=?NULL?; ???} ???... ???for?(;;)?{ ??????assert?(THREAD?==?_owner,?"invariant")?; ??????//?Knob_ExitPolicy默認為0 ??????if?(Knob_ExitPolicy?==?0)?{ ?????????//?code?1:先釋放鎖,這時如果有其他線程進入同步塊則能獲得鎖 ?????????OrderAccess::release_store_ptr?(&_owner,?NULL)?;???//?drop?the?lock ?????????OrderAccess::storeload()?;?????????????????????????//?See?if?we?need?to?wake?a?successor ?????????//?code?2:如果沒有等待的線程或已經有假定繼承人 ?????????if?((intptr_t(_EntryList)|intptr_t(_cxq))?==?0?||?_succ?!=?NULL)?{ ????????????TEVENT?(Inflated?exit?-?simple?egress)?; ????????????return?; ?????????} ?????????TEVENT?(Inflated?exit?-?complex?egress)?; ?????????//?code?3:要執行之后的操作需要重新獲得鎖,即設置_owner為當前線程 ?????????if?(Atomic::cmpxchg_ptr?(THREAD,?&_owner,?NULL)?!=?NULL)?{ ????????????return?; ?????????} ?????????TEVENT?(Exit?-?Reacquired)?; ??????}? ??????... ??????ObjectWaiter?*?w?=?NULL?; ??????//?code?4:根據QMode的不同會有不同的喚醒策略,默認為0 ??????int?QMode?=?Knob_QMode?; ? ??????if?(QMode?==?2?&&?_cxq?!=?NULL)?{ ??????????//?QMode?==?2?:?cxq中的線程有更高優先級,直接喚醒cxq的隊首線程 ??????????w?=?_cxq?; ??????????assert?(w?!=?NULL,?"invariant")?; ??????????assert?(w->TState?==?ObjectWaiter::TS_CXQ,?"Invariant")?; ??????????ExitEpilog?(Self,?w)?; ??????????return?; ??????} ??????if?(QMode?==?3?&&?_cxq?!=?NULL)?{ ??????????//?將cxq中的元素插入到EntryList的末尾 ??????????w?=?_cxq?; ??????????for?(;;)?{ ?????????????assert?(w?!=?NULL,?"Invariant")?; ?????????????ObjectWaiter?*?u?=?(ObjectWaiter?*)?Atomic::cmpxchg_ptr?(NULL,?&_cxq,?w)?; ?????????????if?(u?==?w)?break?; ?????????????w?=?u?; ??????????} ??????????assert?(w?!=?NULL??????????????,?"invariant")?; ??????????ObjectWaiter?*?q?=?NULL?; ??????????ObjectWaiter?*?p?; ??????????for?(p?=?w?;?p?!=?NULL?;?p?=?p->_next)?{ ??????????????guarantee?(p->TState?==?ObjectWaiter::TS_CXQ,?"Invariant")?; ??????????????p->TState?=?ObjectWaiter::TS_ENTER?; ??????????????p->_prev?=?q?; ??????????????q?=?p?; ??????????} ??????????//?Append?the?RATs?to?the?EntryList ??????????//?TODO:?organize?EntryList?as?a?CDLL?so?we?can?locate?the?tail?in?constant-time. ??????????ObjectWaiter?*?Tail?; ??????????for?(Tail?=?_EntryList?;?Tail?!=?NULL?&&?Tail->_next?!=?NULL?;?Tail?=?Tail->_next)?; ??????????if?(Tail?==?NULL)?{ ??????????????_EntryList?=?w?; ??????????}?else?{ ??????????????Tail->_next?=?w?; ??????????????w->_prev?=?Tail?; ??????????} ??????????//?Fall?thru?into?code?that?tries?to?wake?a?successor?from?EntryList ??????} ??????if?(QMode?==?4?&&?_cxq?!=?NULL)?{ ??????????//?將cxq插入到EntryList的隊首 ??????????w?=?_cxq?; ??????????for?(;;)?{ ?????????????assert?(w?!=?NULL,?"Invariant")?; ?????????????ObjectWaiter?*?u?=?(ObjectWaiter?*)?Atomic::cmpxchg_ptr?(NULL,?&_cxq,?w)?; ?????????????if?(u?==?w)?break?; ?????????????w?=?u?; ??????????} ??????????assert?(w?!=?NULL??????????????,?"invariant")?; ??????????ObjectWaiter?*?q?=?NULL?; ??????????ObjectWaiter?*?p?; ??????????for?(p?=?w?;?p?!=?NULL?;?p?=?p->_next)?{ ??????????????guarantee?(p->TState?==?ObjectWaiter::TS_CXQ,?"Invariant")?; ??????????????p->TState?=?ObjectWaiter::TS_ENTER?; ??????????????p->_prev?=?q?; ??????????????q?=?p?; ??????????} ??????????//?Prepend?the?RATs?to?the?EntryList ??????????if?(_EntryList?!=?NULL)?{ ??????????????q->_next?=?_EntryList?; ??????????????_EntryList->_prev?=?q?; ??????????} ??????????_EntryList?=?w?; ??????????//?Fall?thru?into?code?that?tries?to?wake?a?successor?from?EntryList ??????} ??????w?=?_EntryList??; ??????if?(w?!=?NULL)?{ ??????????//?如果EntryList不為空,則直接喚醒EntryList的隊首元素 ??????????assert?(w->TState?==?ObjectWaiter::TS_ENTER,?"invariant")?; ??????????ExitEpilog?(Self,?w)?; ??????????return?; ??????} ??????//?EntryList為null,則處理cxq中的元素 ??????w?=?_cxq?; ??????if?(w?==?NULL)?continue?; ??????//?因為之后要將cxq的元素移動到EntryList,所以這里將cxq字段設置為null ??????for?(;;)?{ ??????????assert?(w?!=?NULL,?"Invariant")?; ??????????ObjectWaiter?*?u?=?(ObjectWaiter?*)?Atomic::cmpxchg_ptr?(NULL,?&_cxq,?w)?; ??????????if?(u?==?w)?break?; ??????????w?=?u?; ??????} ??????TEVENT?(Inflated?exit?-?drain?cxq?into?EntryList)?; ??????assert?(w?!=?NULL??????????????,?"invariant")?; ??????assert?(_EntryList??==?NULL????,?"invariant")?; ??????if?(QMode?==?1)?{ ?????????//?QMode?==?1?:?將cxq中的元素轉移到EntryList,并反轉順序 ?????????ObjectWaiter?*?s?=?NULL?; ?????????ObjectWaiter?*?t?=?w?; ?????????ObjectWaiter?*?u?=?NULL?; ?????????while?(t?!=?NULL)?{ ?????????????guarantee?(t->TState?==?ObjectWaiter::TS_CXQ,?"invariant")?; ?????????????t->TState?=?ObjectWaiter::TS_ENTER?; ?????????????u?=?t->_next?; ?????????????t->_prev?=?u?; ?????????????t->_next?=?s?; ?????????????s?=?t; ?????????????t?=?u?; ?????????} ?????????_EntryList??=?s?; ?????????assert?(s?!=?NULL,?"invariant")?; ??????}?else?{ ?????????//?QMode?==?0?or?QMode?==?2‘ ?????????//?將cxq中的元素轉移到EntryList ?????????_EntryList?=?w?; ?????????ObjectWaiter?*?q?=?NULL?; ?????????ObjectWaiter?*?p?; ?????????for?(p?=?w?;?p?!=?NULL?;?p?=?p->_next)?{ ?????????????guarantee?(p->TState?==?ObjectWaiter::TS_CXQ,?"Invariant")?; ?????????????p->TState?=?ObjectWaiter::TS_ENTER?; ?????????????p->_prev?=?q?; ?????????????q?=?p?; ?????????} ??????} ??????//?_succ不為null,說明已經有個繼承人了,所以不需要當前線程去喚醒,減少上下文切換的比率 ??????if?(_succ?!=?NULL)?continue; ??????w?=?_EntryList??; ??????//?喚醒EntryList第一個元素 ??????if?(w?!=?NULL)?{ ??????????guarantee?(w->TState?==?ObjectWaiter::TS_ENTER,?"invariant")?; ??????????ExitEpilog?(Self,?w)?; ??????????return?; ??????} ???} }
在進行必要的鎖重入判斷以及自旋優化后,進入到主要邏輯:
code 1
?設置owner為null,即釋放鎖,這個時刻其他的線程能獲取到鎖。這里是一個非公平鎖的優化;
code 2
?如果當前沒有等待的線程則直接返回就好了,因為不需要喚醒其他線程。或者如果說succ不為null,代表當前已經有個”醒著的”繼承人線程,那當前線程不需要喚醒任何線程;
code 3
?當前線程重新獲得鎖,因為之后要操作cxq和EntryList隊列以及喚醒線程;
code 4
根據QMode的不同,會執行不同的喚醒策略;
根據QMode的不同,有不同的處理方式:
QMode = 2且cxq非空:取cxq隊列隊首的ObjectWaiter對象,調用ExitEpilog方法,該方法會喚醒ObjectWaiter對象的線程,然后立即返回,后面的代碼不會執行了;
QMode = 3且cxq非空:把cxq隊列插入到EntryList的尾部;
QMode = 4且cxq非空:把cxq隊列插入到EntryList的頭部;
QMode = 0:暫時什么都不做,繼續往下看;
只有QMode=2的時候會提前返回,等于0、3、4的時候都會繼續往下執行:
1.如果EntryList的首元素非空,就取出來調用ExitEpilog方法,該方法會喚醒ObjectWaiter對象的線程,然后立即返回;
2.如果EntryList的首元素為空,就將cxq的所有元素放入到EntryList中,然后再從EntryList中取出來隊首元素執行ExitEpilog方法,然后立即返回;
QMode默認為0,結合上面的流程我們可以看這么個demo:
public?class?SyncDemo?{ ????public?static?void?main(String[]?args)?{ ????????SyncDemo?syncDemo1?=?new?SyncDemo(); ????????syncDemo1.startThreadA(); ????????try?{ ????????????Thread.sleep(100); ????????}?catch?(InterruptedException?e)?{ ????????????e.printStackTrace(); ????????} ????????syncDemo1.startThreadB(); ????????try?{ ????????????Thread.sleep(100); ????????}?catch?(InterruptedException?e)?{ ????????????e.printStackTrace(); ????????} ????????syncDemo1.startThreadC(); ??????? ????} ????final?Object?lock?=?new?Object(); ????public?void?startThreadA()?{ ????????new?Thread(()?->?{ ????????????synchronized?(lock)?{ ????????????????System.out.println("A?get?lock"); ????????????????try?{ ????????????????????Thread.sleep(500); ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????} ????????????????System.out.println("A?release?lock"); ????????????} ????????},?"thread-A").start(); ????} ????public?void?startThreadB()?{ ????????new?Thread(()?->?{ ????????????synchronized?(lock)?{ ????????????????System.out.println("B?get?lock"); ????????????} ????????},?"thread-B").start(); ????} ????public?void?startThreadC()?{ ????????new?Thread(()?->?{ ????????????synchronized?(lock)?{ ????????????????System.out.println("C?get?lock"); ????????????} ????????},?"thread-C").start(); ????} }
默認策略下,在A釋放鎖后一定是C線程先獲得鎖。因為在獲取鎖時,是將當前線程插入到cxq的頭部,而釋放鎖時,默認策略是:如果EntryList為空,則將cxq中的元素按原有順序插入到到EntryList,并喚醒第一個線程。也就是當EntryList為空時,是后來的線程先獲取鎖。這點JDK中的Lock機制是不一樣的。
原理弄清楚了,順便總結了幾點Synchronized和ReentrantLock的區別:
Synchronized是JVM層次的鎖實現,ReentrantLock是JDK層次的鎖實現;
Synchronized的鎖狀態是無法在代碼中直接判斷的,但是ReentrantLock可以通過ReentrantLock#isLocked
判斷;
Synchronized是非公平鎖,ReentrantLock是可以是公平也可以是非公平的;
Synchronized是不可以被中斷的,而ReentrantLock#lockInterruptibly
方法是可以被中斷的;
在發生異常時Synchronized會自動釋放鎖(由javac編譯時自動實現),而ReentrantLock需要開發者在finally塊中顯示釋放鎖;
ReentrantLock獲取鎖的形式有多種:如立即返回是否成功的tryLock(),以及等待指定時長的獲取,更加靈活;
Synchronized在特定的情況下對于已經在等待的線程是后來的線程先獲得鎖(上文有說),而ReentrantLock對于已經在等待的線程一定是先來的線程先獲得鎖;
總的來說Synchronized的重量級鎖和ReentrantLock的實現上還是有很多相似的,包括其數據結構、掛起線程方式等等。在日常使用中,如無特殊要求用Synchronized就夠了。你深入了解這兩者其中一個的實現,了解另外一個或其他鎖機制都比較容易,這也是我們常說的技術上的相通性。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。