您好,登錄后才能下訂單哦!
本篇內容介紹了“Semaphore的原理和實現方法”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
// Go 語言中暴露的 semaphore 實現 // 具體的用法是提供 sleep 和 wakeup 原語 // 以使其能夠在其它同步原語中的競爭情況下使用 // 因此這里的 semaphore 和 Linux 中的 futex 目標是一致的 // 只不過語義上更簡單一些 // // 也就是說,不要認為這些是信號量 // 把這里的東西看作 sleep 和 wakeup 實現的一種方式 // 每一個 sleep 都會和一個 wakeup 配對 // 即使在發生 race 時,wakeup 在 sleep 之前時也是如此 // // See Mullender and Cox, ``Semaphores in Plan 9,'' // http://swtch.com/semaphore.pdf // 為 sync.Mutex 準備的異步信號量 // semaRoot 持有一棵 地址各不相同的 sudog(s.elem) 的平衡樹 // 每一個 sudog 都反過來指向(通過 s.waitlink)一個在同一個地址上等待的其它 sudog 們 // 同一地址的 sudog 的內部列表上的操作時間復雜度都是 O(1)。頂層 semaRoot 列表的掃描 // 的時間復雜度是 O(log n),n 是被哈希到同一個 semaRoot 的不同地址的總數,每一個地址上都會有一些 goroutine 被阻塞。 // 訪問 golang.org/issue/17953 來查看一個在引入二級列表之前性能較差的程序樣例,test/locklinear.go // 中有一個復現這個樣例的測試 type semaRoot struct { lock mutex treap *sudog // root of balanced tree of unique waiters. nwait uint32 // Number of waiters. Read w/o the lock. } // Prime to not correlate with any user patterns. const semTabSize = 251 var semtable [semTabSize]struct { root semaRoot pad [sys.CacheLineSize - unsafe.Sizeof(semaRoot{})]byte } func semroot(addr *uint32) *semaRoot { return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root }
┌─────┬─────┬─────┬─────┬─────┬────────────────────────┬─────┐ │ 0 │ 1 │ 2 │ 3 │ 4 │ ..... │ 250 │ └─────┴─────┴─────┴─────┴─────┴────────────────────────┴─────┘ │ │ │ │ └──┐ └─┐ │ │ │ │ ▼ ▼ ┌─────────┐ ┌─────────┐ │ struct │ │ struct │ ├─────────┴─────────┐ ├─────────┴─────────┐ │ root semaRoot │──┐ │ root semaRoot │──┐ ├───────────────────┤ │ ├───────────────────┤ │ │ pad │ │ │ pad │ │ └───────────────────┘ │ └───────────────────┘ │ │ │ ┌────────────────┘ ┌────────────────┘ │ │ │ │ ▼ ▼ ┌──────────┐ ┌──────────┐ │ semaRoot │ │ semaRoot │ ├──────────┴────────┐ ├──────────┴────────┐ │ lock mutex │ │ lock mutex │ ├───────────────────┤ ├───────────────────┤ │ treap *sudog │ │ treap *sudog │ ├───────────────────┤ ├───────────────────┤ │ nwait uint32 │ │ nwait uint32 │ └───────────────────┘ └───────────────────┘
treap 結構:
┌──────────┐ ┌─┬─?│ sudog │ │ │ ├──────────┴────────────┐ ┌─────────────────────┼─┼──│ prev *sudog │ │ │ │ ├───────────────────────┤ │ │ │ │ next *sudog │────┐ │ │ │ ├───────────────────────┤ │ │ │ │ │ parent *sudog │ │ │ │ │ ├───────────────────────┤ │ │ │ │ │ elem unsafe.Pointer │ │ │ │ │ ├───────────────────────┤ │ │ │ │ │ ticket uint32 │ │ │ │ │ └───────────────────────┘ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ ▼ ┌──────────┐ │ │ ┌──────────┐ │ sudog │ │ │ │ sudog │ ├──────────┴────────────┐ │ │ ├──────────┴────────────┐ │ prev *sudog │ │ │ │ prev *sudog │ ├───────────────────────┤ │ │ ├───────────────────────┤ │ next *sudog │ │ │ │ next *sudog │ ├───────────────────────┤ │ │ ├───────────────────────┤ │ parent *sudog │───┘ └─────────────────────────│ parent *sudog │ ├───────────────────────┤ ├───────────────────────┤ │ elem unsafe.Pointer │ │ elem unsafe.Pointer │ ├───────────────────────┤ ├───────────────────────┤ │ ticket uint32 │ │ ticket uint32 │ └───────────────────────┘ └───────────────────────┘
在這個 treap 結構里,從 elem 的視角(其實就是 lock 的 addr)來看,這個結構是個二叉搜索樹。從 ticket 的角度來看,整個結構就是一個小頂堆。
所以才叫樹堆(treap)。
相同 addr,即對同一個 mutex 上鎖的 g,會阻塞在同一個地址上。這些阻塞在同一個地址上的 goroutine 會被打包成 sudog,組成一個鏈表。用 sudog 的 waitlink 相連:
┌──────────┐ ┌──────────┐ ┌──────────┐ │ sudog │ ┌─────?│ sudog │ ┌─────?│ sudog │ ├──────────┴────────────┐ │ ├──────────┴────────────┐ │ ├──────────┴────────────┐ │ waitlink *sudog │─────┘ │ waitlink *sudog │──────┘ │ waitlink *sudog │ ├───────────────────────┤ ├───────────────────────┤ ├───────────────────────┤ │ waittail *sudog │ │ waittail *sudog │ │ waittail *sudog │ └───────────────────────┘ └───────────────────────┘ └───────────────────────┘
中間的元素的 waittail 都會指向最后一個元素:
┌──────────┐ │ sudog │ ├──────────┴────────────┐ │ waitlink *sudog │ ├───────────────────────┤ │ waittail *sudog │───────────────────────────────────────────────────────────┐ └───────────────────────┘ │ ┌──────────┐ │ │ sudog │ │ ├──────────┴────────────┐ │ │ waitlink *sudog │ │ ├───────────────────────┤ │ │ waittail *sudog │─────────────────────────┤ └───────────────────────┘ ▼ ┌──────────┐ │ sudog │ ├──────────┴────────────┐ │ waitlink *sudog │ ├───────────────────────┤ │ waittail *sudog │ └───────────────────────┘
對外封裝
在 sema.go 里實現的內容,用 go:linkname 導出給 sync、poll 庫來使用,也是在鏈接期做了些手腳:
//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire func sync_runtime_Semacquire(addr *uint32) { semacquire1(addr, false, semaBlockProfile) } //go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire func poll_runtime_Semacquire(addr *uint32) { semacquire1(addr, false, semaBlockProfile) } //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease func sync_runtime_Semrelease(addr *uint32, handoff bool) { semrelease1(addr, handoff) } //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex func sync_runtime_SemacquireMutex(addr *uint32, lifo bool) { semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile) } //go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease func poll_runtime_Semrelease(addr *uint32) { semrelease(addr) }
sem 本身支持 acquire 和 release,其實就是 OS 里常說的 P 操作和 V 操作。
func cansemacquire(addr *uint32) bool { for { v := atomic.Load(addr) if v == 0 { return false } if atomic.Cas(addr, v, v-1) { return true } } }
type semaProfileFlags int const ( semaBlockProfile semaProfileFlags = 1 << iota semaMutexProfile ) // Called from runtime. func semacquire(addr *uint32) { semacquire1(addr, false, 0) } func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags) { gp := getg() if gp != gp.m.curg { throw("semacquire not on the G stack") } // 低成本的情況 if cansemacquire(addr) { return } // 高成本的情況: // 增加 waiter count 的值 // 再嘗試調用一次 cansemacquire,成本了就直接返回 // 沒成功就把自己作為一個 waiter 入隊 // sleep // (之后 waiter 的 descriptor 被 signaler 用 dequeue 踢出) s := acquireSudog() root := semroot(addr) t0 := int64(0) s.releasetime = 0 s.acquiretime = 0 s.ticket = 0 for { lock(&root.lock) // 給 nwait 加一,這樣后來的就不會在 semrelease 中進低成本的路徑了 atomic.Xadd(&root.nwait, 1) // 檢查 cansemacquire 避免錯過了喚醒 if cansemacquire(addr) { atomic.Xadd(&root.nwait, -1) unlock(&root.lock) break } // 在 cansemacquire 之后的 semrelease 都可以知道我們正在等待 // (上面設置了 nwait),所以會直接進入 sleep // 注: 這里說的 sleep 其實就是 goparkunlock root.queue(addr, s, lifo) goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4) if s.ticket != 0 || cansemacquire(addr) { break } } if s.releasetime > 0 { blockevent(s.releasetime-t0, 3) } releaseSudog(s) }
func semrelease(addr *uint32) { semrelease1(addr, false) } func semrelease1(addr *uint32, handoff bool) { root := semroot(addr) atomic.Xadd(addr, 1) // 低成本情況: 沒有 waiter? // 這個 atomic 的檢查必須發生在 xadd 之前,以避免錯誤喚醒 // (具體參見 semacquire 中的循環) if atomic.Load(&root.nwait) == 0 { return } // 高成本情況: 搜索 waiter 并喚醒它 lock(&root.lock) if atomic.Load(&root.nwait) == 0 { // count 值已經被另一個 goroutine 消費了 // 所以我們不需要喚醒其它 goroutine 了 unlock(&root.lock) return } s, t0 := root.dequeue(addr) if s != nil { atomic.Xadd(&root.nwait, -1) } unlock(&root.lock) if s != nil { // 可能會很慢,所以先解鎖 acquiretime := s.acquiretime if acquiretime != 0 { mutexevent(t0-acquiretime, 3) } if s.ticket != 0 { throw("corrupted semaphore ticket") } if handoff && cansemacquire(addr) { s.ticket = 1 } readyWithTime(s, 5) } } func readyWithTime(s *sudog, traceskip int) { if s.releasetime != 0 { s.releasetime = cputicks() } goready(s.g, traceskip) }
sudog 按照地址 hash 到 251 個 bucket 中的其中一個,每一個 bucket 都是一棵 treap。而相同 addr 上的 sudog 會形成一個鏈表。
為啥同一個地址的 sudog 不需要展開放在 treap 中呢?顯然,sudog 喚醒的時候,block 在同一個 addr 上的 goroutine,說明都是加的同一把鎖,這些 goroutine 被喚醒肯定是一起被喚醒的,相同地址的 g 并不需要查找才能找到,只要決定是先進隊列的被喚醒(fifo)還是后進隊列的被喚醒(lifo)就可以了。
// queue 函數會把 s 添加到 semaRoot 上阻塞的 goroutine 們中 // 實際上就是把 s 添加到其地址對應的 treap 上 func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) { s.g = getg() s.elem = unsafe.Pointer(addr) s.next = nil s.prev = nil var last *sudog pt := &root.treap for t := *pt; t != nil; t = *pt { if t.elem == unsafe.Pointer(addr) { // Already have addr in list. if lifo { // treap 中在 t 的位置用 s 覆蓋掉 t *pt = s s.ticket = t.ticket s.acquiretime = t.acquiretime s.parent = t.parent s.prev = t.prev s.next = t.next if s.prev != nil { s.prev.parent = s } if s.next != nil { s.next.parent = s } // 把 t 放在 s 的 wait list 的第一個位置 s.waitlink = t s.waittail = t.waittail if s.waittail == nil { s.waittail = t } t.parent = nil t.prev = nil t.next = nil t.waittail = nil } else { // 把 s 添加到 t 的等待列表的末尾 if t.waittail == nil { t.waitlink = s } else { t.waittail.waitlink = s } t.waittail = s s.waitlink = nil } return } last = t if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) { pt = &t.prev } else { pt = &t.next } } // 把 s 作為樹的新的葉子插入進去 // 平衡樹使用 ticket 作為堆的權重值,這個 ticket 是隨機生成的 // 也就是說,這個結構以元素地址來看的話,是一個二叉搜索樹 // 同時用 ticket 值使其同時又是一個小頂堆,滿足 // s.ticket <= both s.prev.ticket and s.next.ticket. // https://en.wikipedia.org/wiki/Treap // http://faculty.washington.edu/aragon/pubs/rst89.pdf // // s.ticket 會在一些地方和 0 相比,因此只設置最低位的 bit // 這樣不會明顯地影響 treap 的質量? s.ticket = fastrand() | 1 s.parent = last *pt = s // 按照 ticket 來進行旋轉,以滿足 treap 的性質 for s.parent != nil && s.parent.ticket > s.ticket { if s.parent.prev == s { root.rotateRight(s.parent) } else { if s.parent.next != s { panic("semaRoot queue") } root.rotateLeft(s.parent) } } } // dequeue 會搜索到阻塞在 addr 地址的 semaRoot 中的第一個 goroutine // 如果這個 sudog 需要進行 profile,dequeue 會返回它被喚醒的時間(now),否則的話 now 為 0 func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now int64) { ps := &root.treap s := *ps for ; s != nil; s = *ps { if s.elem == unsafe.Pointer(addr) { goto Found } if uintptr(unsafe.Pointer(addr)) < uintptr(s.elem) { ps = &s.prev } else { ps = &s.next } } return nil, 0 Found: now = int64(0) if s.acquiretime != 0 { now = cputicks() } if t := s.waitlink; t != nil { // 替換掉同樣在 addr 上等待的 t。 *ps = t t.ticket = s.ticket t.parent = s.parent t.prev = s.prev if t.prev != nil { t.prev.parent = t } t.next = s.next if t.next != nil { t.next.parent = t } if t.waitlink != nil { t.waittail = s.waittail } else { t.waittail = nil } t.acquiretime = now s.waitlink = nil s.waittail = nil } else { // 向下旋轉 s 到葉節點,以進行刪除,同時要考慮優先級 for s.next != nil || s.prev != nil { if s.next == nil || s.prev != nil && s.prev.ticket < s.next.ticket { root.rotateRight(s) } else { root.rotateLeft(s) } } // Remove s, now a leaf. // 刪除 s,現在是葉子節點了 if s.parent != nil { if s.parent.prev == s { s.parent.prev = nil } else { s.parent.next = nil } } else { root.treap = nil } } s.parent = nil s.elem = nil s.next = nil s.prev = nil s.ticket = 0 return s, now }
“Semaphore的原理和實現方法”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。