您好,登錄后才能下訂單哦!
LearnerZooKeeperServer是所有Follower和Observer的父類,在LearnerZooKeeperServer里有2個重要的屬性:
//提交請求處理器
protected CommitProcessor commitProcessor;
//同步處理器
protected SyncRequestProcessor syncProcessor;
FollowerZooKeeperServer和ObserverZooKeeperServer都繼承了LearnerZooKeeperServer服務器。
//待同步的請求
ConcurrentLinkedQueue<Request> pendingSyncs;
//待處理的事務請求
LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();
構建請求處理鏈,FollowerZooKeeperServer的請求處理鏈是:
FollowerRequestProcessor -> CommitProcessor ->FinalRequestProcessor
@Override
protected void setupRequestProcessors() {
//最后的處理器
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
//第二個處理器
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
//第一個請求處理器FollowerRequestProcessor
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this,
new SendAckRequestProcessor((Learner)getFollower()));
syncProcessor.start();
}
該函數將請求進行記錄(放入到對應的隊列中),等待處理。
public void logRequest(TxnHeader hdr, Record txn) {
Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
//zxid不等于0,說明此服務器已經處理過請求
if ((request.zxid & 0xffffffffL) != 0) {
// 將該請求放入pendingTxns中,等待事務處理
pendingTxns.add(request);
}
// 使用SyncRequestProcessor處理請求(其會將請求放在隊列中,異步進行處理)
syncProcessor.proce***equest(request);
}
函數會提交zxid對應的請求(pendingTxns的隊首元素),其首先會判斷隊首請求對應的zxid是否為傳入的zxid,然后再進行移除和提交(放在committedRequests隊列中)。
public void commit(long zxid) {
// 沒有還在等待處理的事務
if (pendingTxns.size() == 0) {
LOG.warn("Committing " + Long.toHexString(zxid)
+ " without seeing txn");
return;
}
// 隊首元素的zxid
long firstElementZxid = pendingTxns.element().zxid;
// 如果隊首元素的zxid不等于需要提交的zxid,則退出程序
if (firstElementZxid != zxid) {
LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
+ " but next pending txn 0x"
+ Long.toHexString(firstElementZxid));
System.exit(12);
}
// 從待處理事務請求隊列中移除隊首請求
Request request = pendingTxns.remove();
// 提交該請求
commitProcessor.commit(request);
}
// 同步處理器是否可用,系統參數控制
private boolean syncRequestProcessorEnabled = this.self.getSyncEnabled();
// 待同步請求隊列
ConcurrentLinkedQueue<Request> pendingSyncs =
new ConcurrentLinkedQueue<Request>();
構建請求處理鏈,ObserverZooKeeperServer的請求處理鏈是:ObserverRequestProcessor->CommitProcessor->FinalRequestProcessor,可能會存在SyncRequestProcessor。
@Override
protected void setupRequestProcessors() {
// We might consider changing the processor behaviour of
// Observers to, for example, remove the disk sync requirements.
// Currently, they behave almost exactly the same as followers.
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
((ObserverRequestProcessor) firstProcessor).start();
/*
* Observer should write to disk, so that the it won't request
* too old txn from the leader which may lead to getting an entire
* snapshot.
*
* However, this may degrade performance as it has to write to disk
* and do periodic snapshot which may double the memory requirements
*/
//是否使用同步處理器,看系統參數配置,會影響性能
if (syncRequestProcessorEnabled) {
syncProcessor = new SyncRequestProcessor(this, null);
syncProcessor.start();
}
}
同步處理器可用,則使用同步處理器進行處理(放入同步處理器的queuedRequests隊列中),然后提交請求(放入提交請求處理器的committedRequests隊列中)
public void commitRequest(Request request) {
if (syncRequestProcessorEnabled) {
// Write to txnlog and take periodic snapshot
//寫事務日志,并定期快照
syncProcessor.proce***equest(request);
}
commitProcessor.commit(request);
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。