您好,登錄后才能下訂單哦!
這篇文章給大家介紹以太坊Downloader模塊下StateSync.go源碼分析是怎樣的,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
statesync是一個同步狀態的模塊,最直接的聯系為block中的stateRoot,stateRoot就是使用statesync模塊進行下載的。
萬物之始,首先介紹statesync本身的數據結構
// stateSync schedules requests for downloading a particular state trie defined // by a given state root. // stateSync通過調度發送請求下去載一個特定狀態根的狀態樹 type stateSync struct { d *Downloader // Downloader instance to access and manage current peerset 組合downloader模塊 //調度器 sched *trie.Sync // State trie sync scheduler defining the tasks keccak hash.Hash // Keccak256 hasher to verify deliveries with // 請求任務隊列 tasks map[common.Hash]*stateTask // Set of tasks currently queued for retrieval // 統計位 numUncommitted int bytesUncommitted int // 處理數據相關的通道 deliver chan *stateReq // Delivery channel multiplexing peer responses cancel chan struct{} // Channel to signal a termination request cancelOnce sync.Once // Ensures cancel only ever gets called once done chan struct{} // Channel to signal termination completion err error // Any error hit during sync (set before completion) }
以上即是statesync的主要數據結構,下面緊接著開始進入源碼引導分析之旅
// 1、首先看一下狀態同步如何開啟,以及如何觸發狀態同步 // 狀態同步線程開啟: // downloader下的new方法 // New creates a new downloader to fetch hashes and blocks from remote peers. func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { ..................... go dl.stateFetcher() return dl } // 進入到stateFetcher方法以后,就開始輪訓通道,等待狀態同步任務到來 // stateFetcher manages the active state sync and accepts requests // on its behalf. func (d *Downloader) stateFetcher() { for { select { // 從d.stateSyncStart通道讀取同步任務 case s := <-d.stateSyncStart: for next := s; next != nil; { // 調用runStateSync方法進行狀態同步 next = d.runStateSync(next) } case <-d.stateCh: // Ignore state responses while no sync is running. case <-d.quitCh: return } } }
上面主要介紹了下狀態同步線程stateFetcher何時開啟,開啟后做了什么事情。下面開始講解何時觸發了狀態同步
// 1、首先回憶downloader處理快速同步內容的方法processFastSyncContent func (d *Downloader) processFastSyncContent(latest *types.Header) error { // Start syncing state of the reported head block. This should get us most of // the state of the pivot block. // 在processFastSyncContent方法的第一行調用了d.sycnState方法,傳入bestPeer的最新區塊的stateRoot stateSync := d.syncState(latest.Root) ................... } // 2、下面我們分析一下d.sycnState(latest.Root)做了什么工作 func (d *Downloader) syncState(root common.Hash) *stateSync { // 先調用newStateSync構造同步任務 s := newStateSync(d, root) select { // 將同步任務放入stateFetcher輪訓的通道中,交給stateFether處理,stateFetcher在上面已經提到 case d.stateSyncStart <- s: case <-d.quitCh: s.err = errCancelStateFetch close(s.done) } return s } // 3、先來分析newStateSync做了什么工作 func newStateSync(d *Downloader, root common.Hash) *stateSync { // 實際就是構造stateSync結構體,其中state.NewStateSync我們在下面再繼續分析 return &stateSync{ d: d, sched: state.NewStateSync(root, d.stateDB), keccak: sha3.NewLegacyKeccak256(), tasks: make(map[common.Hash]*stateTask), deliver: make(chan *stateReq), cancel: make(chan struct{}), done: make(chan struct{}), } } func NewStateSync(root common.Hash, database trie.DatabaseReader) *trie.Sync { var syncer *trie.Sync // 定義callback方法 callback := func(leaf []byte, parent common.Hash) error { var obj Account if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil { return err } // 隨后在下面分析一下AddSubTrie方法 syncer.AddSubTrie(obj.Root, 64, parent, nil) syncer.AddRawEntry(common.BytesToHash(obj.CodeHash), 64, parent) return nil } // 調用trie.NewSync方法 syncer = trie.NewSync(root, database, callback) return syncer } // NewSync creates a new trie data download scheduler. func NewSync(root common.Hash, database DatabaseReader, callback LeafCallback) *Sync { ts := &Sync{ // 數據庫 database: database, // 內存緩存 membatch: newSyncMemBatch(), // 請求map requests: make(map[common.Hash]*request), // 優先級隊列 queue: prque.New(nil), } // 下面分析AddSubTrie ts.AddSubTrie(root, 0, common.Hash{}, callback) return ts } func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callback LeafCallback) { // 判斷根是不是為空 if root == emptyRoot { return } // 判斷內存中是否已經存在此root,存在的話直接返回 if _, ok := s.membatch.batch[root]; ok { return } // 從本地數據庫中獲取數據 key := root.Bytes() blob, _ := s.database.Get(key) // 如果本地解析出來這個root trie節點,那么證明已經存在此根,直接返回 if local, err := decodeNode(key, blob, 0); local != nil && err == nil { return } // 執行到此開始裝配同步請求 req := &request{ hash: root, depth: depth, callback: callback, } // 如果這個子樹有指定的父節點,把他們鏈接在 if parent != (common.Hash{}) { ancestor := s.requests[parent] if ancestor == nil { panic(fmt.Sprintf("sub-trie ancestor not found: %x", parent)) } ancestor.deps++ req.parents = append(req.parents, ancestor) } // 開始將請求放入調度器的任務切片中 s.schedule(req) } func (s *Sync) schedule(req *request) { // If we're already requesting this node, add a new reference and stop if old, ok := s.requests[req.hash]; ok { old.parents = append(old.parents, req.parents...) return } // Schedule the request for future retrieval s.queue.Push(req.hash, int64(req.depth)) s.requests[req.hash] = req }
以上主要是同步任務的構建,直到最后被放入調度器會拿取數據的切片中,下面我們開始分析如何將同步請求發送出去及將請求結果落入本地數據庫。
// 1、上面已經構建好一個stateSync結構體并且將其傳送到d.stateSyncStart通道,stateFetcher中監聽此通道數據,拿到數據后會調用d.runStateSync()方法,那么下面就開始分析runStateSync方法 func (d *Downloader) runStateSync(s *stateSync) *stateSync { var ( active = make(map[string]*stateReq) // 處理中的請求 finished []*stateReq // 完成或者失敗的請求 timeout = make(chan *stateReq) // 過期的請求 ) defer func() { // 方法解釋如下英文 // Cancel active request timers on exit. Also set peers to idle so they're // available for the next sync. for _, req := range active { req.timer.Stop() req.peer.SetNodeDataIdle(len(req.items)) } }() // 另起線程執行狀態同步主循環,后面分析 go s.run() defer s.Cancel() // 下面監聽peer斷開事件,斷開后則不給斷開的peer分配任務 peerDrop := make(chan *peerConnection, 1024) peerSub := s.d.peers.SubscribePeerDrops(peerDrop) defer peerSub.Unsubscribe() // 下面for循環處理各種通道數據 for { // Enable sending of the first buffered element if there is one. var ( deliverReq *stateReq deliverReqCh chan *stateReq ) if len(finished) > 0 { deliverReq = finished[0] deliverReqCh = s.deliver } select { // 如果在狀態同步期間另一個同步請出現,則此方法返回新的stateSync對象,重新調用runStateSync方法 case next := <-d.stateSyncStart: return next // 如果收到同步完成的消息則返回 case <-s.done: return nil // 從finished中取出第一個完成的請求,發送請求完成的結果給deliverReqCh并刪除finised第一個元素,縮短finished切片 case deliverReqCh <- deliverReq: // Shift out the first request, but also set the emptied slot to nil for GC copy(finished, finished[1:]) finished[len(finished)-1] = nil finished = finished[:len(finished)-1] // 處理收到的請求回應包 case pack := <-d.stateCh: //如果此回應包不是我們請求的,則忽略 req := active[pack.PeerId()] if req == nil { log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items()) continue } // 先終止此請求的定時器 req.timer.Stop() // 將回應結果賦值給請求的response req.response = pack.(*statePack).states // 將此請求添加到已完成切片中國 finished = append(finished, req)、 // 從正在處理請求map結構中刪除此請求 delete(active, pack.PeerId()) // 處理peerDrop消息 case p := <-peerDrop: // 如果此peer對應的請求為空,則直接跳過此次處理 req := active[p.id] if req == nil { continue } // 若此peer對應有相應請求,則停止其計時器并將dropped標志置為true req.timer.Stop() req.dropped = true // 將此請求加入finished切片中,finished切片存放完成的請求或者失敗的請求,并從active中刪除此peer對應的請求 finished = append(finished, req) delete(active, p.id) // 處理超時請求 case req := <-timeout: // 如果此peer已經有了其他狀態請求,則跳過 if active[req.peer.id] != req { continue } // 要不然按照失敗請求處理 finished = append(finished, req) delete(active, req.peer.id) // 跟蹤外發的狀態請求 case req := <-d.trackStateReq: // If an active request already exists for this peer, we have a problem. In // theory the trie node schedule must never assign two requests to the same // peer. In practice however, a peer might receive a request, disconnect and // immediately reconnect before the previous times out. In this case the first // request is never honored, alas we must not silently overwrite it, as that // causes valid requests to go missing and sync to get stuck. // 此段理解請看上述英文 if old := active[req.peer.id]; old != nil { log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id) // Make sure the previous one doesn't get siletly lost old.timer.Stop() old.dropped = true finished = append(finished, old) } // 給請求添加計時器 // Start a timer to notify the sync loop if the peer stalled. req.timer = time.AfterFunc(req.timeout, func() { select { case timeout <- req: case <-s.done: // Prevent leaking of timer goroutines in the unlikely case where a // timer is fired just before exiting runStateSync. } }) // 將此請求放入活躍請求Map中 active[req.peer.id] = req } } }
以上就是runstateSync代碼,期間加的注釋很方便大家理解,主要就是在for循環處理各個通道的消息,下面來分析下s.run()方法,這個方法是狀態同步的主循環。
func (s *stateSync) run() { s.err = s.loop() close(s.done) } func (s *stateSync) loop() (err error) { // 監聽新peer產生事件,以便給新peer安排任務 newPeer := make(chan *peerConnection, 1024) peerSub := s.d.peers.SubscribeNewPeers(newPeer) defer peerSub.Unsubscribe() defer func() { cerr := s.commit(true) if err == nil { err = cerr } }() // 若s.request中有狀態同步請求(也就是上面s.schedule()方法中放入的請求)那么開始處理 for s.sched.Pending() > 0 { // 先執行一次數據提交,參數表示是否強制提交,方法后面分析 if err = s.commit(false); err != nil { return err } // 開始安排任務 s.assignTasks() // Tasks assigned, wait for something to happen select { case <-newPeer: // New peer arrived, try to assign it download tasks case <-s.cancel: return errCancelStateFetch case <-s.d.cancelCh: return errCancelStateFetch // 處理狀態請求返回的結果,這個結果的傳送過程在此講解下: // 首先給相關節點發送GetNodeDataMsg,對方回復NodeDataMsg,ProtocolManager.handlerMsg調用pm.downloader.DeliverNodeData(p.id, data)將消息進行分發,實際調用d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)進行消息分發,將消息寫入d.stateCh // d.stateCh在runStateSync方法的for循環中處理此消息,將消息放入finised切片中 // for循環開始時候會將deliverReq = finished[0] deliverReqCh = s.deliver // 再select deliverReqCh <- deliverReq // 最后下面代碼將受到狀態請求的結果 case req := <-s.deliver: // Response, disconnect or timeout triggered, drop the peer if stalling log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut()) if len(req.items) <= 2 && !req.dropped && req.timedOut() { // 2 items are the minimum requested, if even that times out, we've no use of this peer at the moment. log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id) s.d.dropPeer(req.peer.id) } // 開始處理狀態同步請求的結果 delivered, err := s.process(req) if err != nil { log.Warn("Node data write error", "err", err) return err } // 設置此peer為空閑 req.peer.SetNodeDataIdle(delivered) } } return nil }
以上就是s.run()主循環做的事情,主要事情是:
1、檢測是否有新的狀態請求,若無則運行一次狀態強制提交,若有則進行下面處理
2、先執行一次非強制提交,看看內存是否夠用,若夠用則不做任何動作,要不然進行狀態提交
3、給空閑peer安排狀態請求任務
4、處理狀態請求結果
commit(bool)
func (s *stateSync) commit(force bool) error { // 若為非強制提交模式且為提交的字節小于100 * 1024 則返回 if !force && s.bytesUncommitted < ethdb.IdealBatchSize { return nil } // 若上面條件不滿足,則開始進行數據持久化 start := time.Now()、 // 創建一個Batch b := s.d.stateDB.NewBatch() // 將 s.membatch 數據放入etgdb.Putter中 if written, err := s.sched.Commit(b); written == 0 || err != nil { return err } // 調用 batch的Write()方法,實際將Putter的數據寫入數據庫 if err := b.Write(); err != nil { return fmt.Errorf("DB write error: %v", err) } // 將相關狀態打印給用戶看 s.updateStats(s.numUncommitted, 0, 0, time.Since(start)) // 將相關狀態置0 s.numUncommitted = 0 s.bytesUncommitted = 0 return nil }
關于以太坊Downloader模塊下StateSync.go源碼分析是怎樣的就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。