您好,登錄后才能下訂單哦!
Zookeeper 使用 Zookeeper Atomic Broadcast (ZAB) 協議來保障分布式數據一致性。
ZAB是一種支持崩潰恢復的消息廣播協議,采用類似2PC的廣播模式保證正常運行時性能,并使用基于 Paxos 的策略保證崩潰恢復時的一致性。
有的Follower服務器分發Commit消息,要求其將前一個Proposal進行提交。
ZAB一些包括兩種基本的模式:崩潰恢復和消息廣播。
1、當整個服務框架啟動過程中或Leader服務器出現網絡中斷、崩潰退出與重啟等異常情況時,ZAB協議就會進入恢復模式并選舉產生新的Leader服務器。當選舉產生了新的Leader服務器,同時集群中已經有過半的機器與該Leader服務器完成了狀態同步之后,ZAB協議就會退出恢復模式,狀態同步是指數據同步,用來保證集群在過半的機器能夠和Leader服務器的數據狀態保持一致。
2、當集群中已經有過半的Follower服務器完成了和Leader服務器的狀態同步,那么整個服務框架就可以進入消息廣播模式,當一臺同樣遵守ZAB協議的服務器啟動后加入到集群中,如果此時集群中已經存在一個Leader服務器在負責進行消息廣播,那么加入的服務器就會自覺地進入數據恢復模式:找到Leader所在的服務器,并與其進行數據同步,然后一起參與到消息廣播流程中去。Zookeeper只允許唯一的一個Leader服務器來進行事務請求的處理,Leader服務器在接收到客戶端的事務請求后,會生成對應的事務提議并發起一輪廣播協議,而如果集群中的其他機器收到客戶端的事務請求后,那么這些非Leader服務器會首先將這個事務請求轉發給Leader服務器。
3、當Leader服務器出現崩潰或者機器重啟、集群中已經不存在過半的服務器與Leader服務器保持正常通信時,那么在重新開始新的一輪的原子廣播事務操作之前,所有進程首先會使用崩潰恢復協議來使彼此到達一致狀態,于是整個ZAB流程就會從消息廣播模式進入到崩潰恢復模式。一個機器要成為新的Leader,必須獲得過半機器的支持,同時由于每個機器都有可能會崩潰,因此,ZAB協議運行過程中,前后會出現多個Leader,并且每臺機器也有可能會多次成為Leader,進入崩潰恢復模式后,只要集群中存在過半的服務器能夠彼此進行正常通信,那么就可以產生一個新的Leader并再次進入消息廣播模式。如一個由三臺機器組成的ZAB服務,通常由一個Leader、2個Follower服務器組成,某一個時刻,加入其中一個Follower掛了,整個ZAB集群是不會中斷服務的。
ZAB協議中節點存在四種狀態:
Leading: 當前節點為集群 Leader,負責協調事務
Following: 當前節點為 Follower 在 Leader 協調下執行事務
Looking: 集群沒有正在運行的 Leader, 正處于選舉過程
Observing: 節點跟隨 Leader 保存系統最新的狀態提供讀服務,但不參與選舉和事務投票
Zab協議消息廣播有以下4個步驟組成:
- Leader發送PROPOSAL給集群中所有的節點。
- 節點在收到PROPOSAL之后,把PROPOSAL落盤,發送一個ACK給Leader。
- Leader在收到大多數節點的ACK之后,發送COMMIT給集群中所有的Follower節點。
- 如果存在Observer節點,Leader同時發送INFORM信息給Observer服務節點同步數據,Observer只接收Leader的INFORM消息同步數據,不參與Leader選舉和事務提交。
在Leader服務器出現崩潰,或者由于網絡原因導致Leader服務器失去了與過半Follower的聯系,那么就會進入崩潰恢復模式,在ZAB協議中,為了保證程序的正確運行,整個恢復過程結束后需要選舉出一個新的Leader服務器,因此,ZAB協議需要一個高效且可靠的Leader選舉算法,從而保證能夠快速地選舉出新的Leader,同時,Leader選舉算法不僅僅需要讓Leader自身知道已經被選舉為Leader,同時還需要讓集群中的所有其他機器也能夠快速地感知到選舉產生的新的Leader服務器。
ZAB協議的基本原則
假設一個事務在Leader服務器上被提交了,并且已經得到了過半Follower服務器的Ack反饋,但是在它Commit消息發送給所有Follower機器之前,Leader服務掛了。如下圖所示:
在集群正常運行過程中的某一個時刻,Server1是Leader服務器,其先后廣播了P1、P2、C1、P3、C2(C2是Commit Of Proposal2的縮寫),其中,當Leader服務器發出C2后就立即崩潰退出了,針對這種情況,ZAB協議就需要確保事務Proposal2最終能夠在所有的服務器上都被提交成功,否則將出現不一致。
如果在崩潰恢復過程中出現一個需要被丟棄的提議,那么在崩潰恢復結束后需要跳過該事務Proposal,如下圖所示:
假設初始的Leader服務器Server1在提出一個事務Proposal3之后就崩潰退出了,從而導致集群中的其他服務器都沒有收到這個事務Proposal,于是,當Server1恢復過來再次加入到集群中的時候,ZAB協議需要確保丟棄Proposal3這個事務。
能夠確保提交已經被Leader提交的事務的Proposal,同時丟棄已經被跳過的事務Proposal。如果讓Leader選舉算法能夠保證新選舉出來的Leader服務器擁有集群中所有機器最高編號(ZXID最大)的事務Proposal,那么就可以保證這個新選舉出來的Leader一定具有所有已經提交的提議,更為重要的是如果讓具有最高編號事務的Proposal機器稱為Leader,就可以省去Leader服務器查詢Proposal的提交和丟棄工作這一步驟了。
完成Leader選舉后,在正式開始工作前,Leader服務器首先會確認日志中的所有Proposal是否都已經被集群中的過半機器提交了,即是否完成了數據同步。Leader服務器需要確所有的Follower服務器都能夠接收到每一條事務Proposal,并且能夠正確地將所有已經提交了的事務Proposal應用到內存數據庫中。Leader服務器會為每個Follower服務器維護一個隊列,并將那些沒有被各Follower服務器同步的事務以Proposal消息的形式逐個發送給Follower服務器,并在每一個Proposal消息后面緊接著再發送一個Commit消息,以表示該事務已經被提交,等到Follower服務器將所有其尚未同步的事務Proposal都從Leader服務器上同步過來并成功應用到本地數據庫后,Leader服務器就會將該Follower服務器加入到真正的可用Follower列表并開始之后的其他流程。
1、 發現,選舉產生Leader,產生最新的epoch(每次選舉產生新Leader的同時產生新epoch)。
2、 同步,各Follower和Leader完成數據同步。
3、廣播,Leader處理客戶端的寫操作,并將狀態變更廣播至Follower,Follower多數通過之后Leader發起將狀態變更落地Commit。
在正常運行過程中,ZAB協議會一直運行于階段三來反復進行消息廣播流程,如果出現崩潰或其他原因導致Leader缺失,那么此時ZAB協議會再次進入發現階段,選舉新的Leader。
ProposalRequestProcessor.proce***equest()方法發送PROPOSAL 給每一個節點。它調用Leader.propose()方法把PROPOSAL
入隊到各個follower的queuedPackets,然后直接把PROPOSAL提交給leader節點自己的SyncRequestProcessor 。
以下是大概的代碼路徑:
ProposalRequestProcessor.proce***equest(request)
zks.getLeader().propose(request)
sendPacket(pp)
for f in forwardingFollowers
f.queuePacket(qp)
queuedPackets.add(p)
syncProcessor.proce***equest(request)
SyncRequestProcessor先處理
SyncRequestProcessor.run()
zks.getZKDatabase().append(si)
flush(toFlush)
zks.getZKDatabase().commit()
while (!toFlush.isEmpty())
Request i = toFlush.remove()
if (nextProcessor != null)
nextProcessor.proce***equest(i)
然后是Leader的ACK處理器處理,返回給Leader自己ACK結果
AckRequestProcessor.proce***equest()
proce***equest()
leader.processAck(self.getId(), request.zxid, null)
Follower. followLeader()方法處理接收到的QuorumPacket, case Leader.PROPOSAL分支處理的就是PROPOSAL。
Follower.followLeader()
loop
readPacket(qp)
leaderIs.readRecord(pp, "packet")
processPacket(qp)
case Leader.PROPOSAL
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr)
fzk.logRequest(hdr, txn)
syncProcessor.proce***equest(request)
case Leader.COMMIT:
fzk.commit(qp.getZxid())
commitProcessor.commit(request)
SyncRequestProcessor的處理邏輯
SyncRequestProcessor.run()
zks.getZKDatabase().append(si)
flush(toFlush)
zks.getZKDatabase().commit()
while (!toFlush.isEmpty())
Request i = toFlush.remove()
if (nextProcessor != null)
nextProcessor.proce***equest(i)
QuorumPacket qp = new QuorumPacket(Leader.ACK)
learner.writePacket(qp, false)
leaderOs.writeRecord(pp, "packet")
((Flushable)nextProcessor).flush()
learner.writePacket(null, true)
bufferedOutput.flush()
Leader的processAck()處理ACK消息,如果收到大多數節點的ACK,發送COMMIT給所有的follower節點,并調用leader自己 的CommitProcessor。 processAck()有兩個調用入口:1. LeaderHandler的run()方法處理來自follower的ACK。2. AckRequestProcessor的proce***equest方法處理leader自己的ACK。
Leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress())
Proposal p = outstandingProposals.get(zxid)
p.addAck(sid)
tryToCommit(p, zxid, followerAddr)
if !p.hasAllQuorums()
return false;
// Commit on all followers
commit(zxid)
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null)
sendPacket(qp)
// Commit on Leader
zk.commitProcessor.commit(p.request)
CommitProcessor.run()
request = queuedRequests.poll()
processCommitted()
sendToNextProcessor(pending)
已經提交的請求,交給ToBeAppliedRequestProcessor準備應用到內存數據庫
ToBeAppliedRequestProcessor.proce***equest()
next.proce***equest(request)
最后交給FinalRequestProcessor,返回響應結果
CommitProcessor.run()
request = queuedRequests.poll()
processCommitted()
sendToNextProcessor(pending)
//返回響應結果
FinalRequestProcessor.proce***equest()
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。