您好,登錄后才能下訂單哦!
本篇內容主要講解“sync.Pool的實現原理是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“sync.Pool的實現原理是什么”吧!
對象的創建和銷毀會消耗一定的系統資源(內存,gc等),過多的創建銷毀對象會帶來內存不穩定與更長的gc停頓,因為go的gc不存在分代,因而更加不擅長處理這種問題。因而go早早就推出Pool包用于緩解這種情況。Pool用于核心的功能就是Put和Get。當我們需要一個對象的時候通過Get獲取一個,創建的對象也可以Put放進池子里,通過這種方式可以反復利用現有對象,這樣gc就不用高頻的促發內存gc了。
type Pool struct { noCopy noCopy local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal localSize uintptr // size of the local array // New optionally specifies a function to generate // a value when Get would otherwise return nil. // It may not be changed concurrently with calls to Get. New func() interface{} }
創建時候指定New方法用于創建默認對象,local,localSize會在隨后用到的時候生成. local是一個poolLocalInternal的切片指針。
type poolLocalInternal struct { private interface{} // Can be used only by the respective P. shared []interface{} // Can be used by any P. Mutex // Protects shared. }
當不同的p調用Pool時,每個p都會在local上分配這樣一個poolLocal,索引值就是p的id。 private存放的對象只能由創建的p讀寫,shared則會在多個p之間共享。
// Put adds x to the pool. func (p *Pool) Put(x interface{}) { if x == nil { return } if race.Enabled { if fastrand()%4 == 0 { // Randomly drop x on floor. return } race.ReleaseMerge(poolRaceAddr(x)) race.Disable() } l := p.pin() if l.private == nil { l.private = x x = nil } runtime_procUnpin() if x != nil { l.Lock() l.shared = append(l.shared, x) l.Unlock() } if race.Enabled { race.Enable() } }
Put先要通過pin函數獲取當前Pool對應的pid位置上的localPool,然后檢查private是否存在,存在則設置到private上,如果不存在就追加到shared尾部。
func (p *Pool) pin() *poolLocal { pid := runtime_procPin() // In pinSlow we store to localSize and then to local, here we load in opposite order. // Since we've disabled preemption, GC cannot happen in between. // Thus here we must observe local at least as large localSize. // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness). s := atomic.LoadUintptr(&p.localSize) // load-acquire l := p.local // load-consume if uintptr(pid) < s { // 這句話的意思是如果當前pool的localPool切片尚未創建,尚未創建這句話肯定是false的 return indexLocal(l, pid) } return p.pinSlow() }
pin函數先通過自旋加鎖(可以避免p自身發生并發),在檢查本地local切片的size,size大于當前pid則使用pid去本地local切片上索引到localpool對象,否則就要走pinSlow對象創建本地localPool切片了.
func (p *Pool) pinSlow() *poolLocal { // Retry under the mutex. // Can not lock the mutex while pinned. runtime_procUnpin() allPoolsMu.Lock() defer allPoolsMu.Unlock() pid := runtime_procPin() // poolCleanup won't be called while we are pinned. s := p.localSize l := p.local if uintptr(pid) < s { return indexLocal(l, pid) } if p.local == nil { allPools = append(allPools, p) } // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one. size := runtime.GOMAXPROCS(0) local := make([]poolLocal, size) atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release return &local[pid] }
pinShow先要取消自旋鎖,因為后面的lock內部也會嘗試自旋鎖,下面可能會操作allpool因而這里需要使用互斥鎖allPoolsMu,然后又加上自旋鎖,(這里注釋說不會發生poolCleanup,但是查看代碼gcstart只是查看了當前m的lock狀態,然而避免不了其他m觸發的gc,尚存疑),這里會再次嘗試之前的操作,因為可能在unpin,pin之間有并發產生了poolocal,確認本地local切片是空的才會生成一個新的pool。后面是創建Pool上的localPool切片,runtime.GOMAXPROCS這里的作用是返回p的數量,用于確定pool的localpool的數量.
func (p *Pool) Get() interface{} { if race.Enabled { race.Disable() } l := p.pin() x := l.private l.private = nil runtime_procUnpin() if x == nil { l.Lock() last := len(l.shared) - 1 if last >= 0 { x = l.shared[last] l.shared = l.shared[:last] } l.Unlock() if x == nil { x = p.getSlow() } } if race.Enabled { race.Enable() if x != nil { race.Acquire(poolRaceAddr(x)) } } if x == nil && p.New != nil { x = p.New() } return x }
GET 先調用pin獲取本地local,這個具體流程和上面一樣了,如果當前private存在返回private上面的對象,如果不存在就從shared查找,存在返回尾部對象,反之就要從其他的p的localPool里面偷了。
func (p *Pool) getSlow() (x interface{}) { // See the comment in pin regarding ordering of the loads. size := atomic.LoadUintptr(&p.localSize) // load-acquire local := p.local // load-consume // Try to steal one element from other procs. pid := runtime_procPin() runtime_procUnpin() for i := 0; i < int(size); i++ { l := indexLocal(local, (pid+i+1)%int(size)) l.Lock() last := len(l.shared) - 1 if last >= 0 { x = l.shared[last] l.shared = l.shared[:last] l.Unlock() break } l.Unlock() } return x }
首先就要獲取當前size,用于輪詢p的local,這里的查詢順序不是從0開始,而是是從當前p的位置往后查一圈。查到依次檢查每個p的shared上是否存在對象,如果存在就獲取末尾的值。 如果所有p的poollocal都是空的,那么初始化的New函數就起作用了,調用這個New函數創建一個新的對象出來。
func poolCleanup() { // This function is called with the world stopped, at the beginning of a garbage collection. // It must not allocate and probably should not call any runtime functions. // Defensively zero out everything, 2 reasons: // 1. To prevent false retention of whole Pools. // 2. If GC happens while a goroutine works with l.shared in Put/Get, // it will retain whole Pool. So next cycle memory consumption would be doubled. for i, p := range allPools { allPools[i] = nil for i := 0; i < int(p.localSize); i++ { l := indexLocal(p.local, i) l.private = nil for j := range l.shared { l.shared[j] = nil } l.shared = nil } p.local = nil p.localSize = 0 } allPools = []*Pool{} }
pool對象的清理是在每次gc之前清理,通過runtime_registerPoolCleanup函數注冊一個上面的poolCleanup對象,內部會把這個函數設置到clearpool函數上面,然后每次gc之前會調用clearPool來取消所有pool的引用,重置所有的Pool。代碼很簡單就是輪詢一邊設置nil,然后取消所有poollocal,pool引用。方法簡單粗暴。由于clearPool是在STW中調用的,如果Pool存在大量對象會拉長STW的時間,在已經有提案來修復這個問題了(CL 166961.)[https://go-review.googlesource.com/c/go/+/166961/]
到此,相信大家對“sync.Pool的實現原理是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。