您好,登錄后才能下訂單哦!
這篇文章主要介紹“zk中learner的作用是什么”,在日常操作中,相信很多人在zk中learner的作用是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”zk中learner的作用是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
learner時observer,follower的父類,定義了公共屬性和方法
子類 Follower 和Observer
內部類:
PacketInFlight表示在提議中還沒有commit的消息
static class PacketInFlight { TxnHeader hdr; Record rec; }
屬性:
QuorumPeer | 服務器節點 |
LearnerZooKeeperServer | learner的服務節點 |
BufferedOutputStream | 輸出流 |
Socket | 端口套接字 |
InetSocketAddress | 地址信息 |
InputArchive | 輸入存檔 |
OutputArchive | 輸出存檔 |
leaderProtocolVersion | leader協議版本 |
BUFFERED_MESSAGE_SIZE | 緩存信息大小 |
MessageTracker | 順序接收和發送信息 |
方法
validateSession(ServerCnxn cnxn, long clientId, int timeout) | 驗證session有效性 |
writePacket(QuorumPacket pp, boolean flush) | 發送包給leader |
readPacket(QuorumPacket pp) | 從leader讀取message |
request(Request request) | 發送request給leader |
findLeader | 查找認為是leader的地址信息 |
createSocket() | 創建socket對象 |
registerWithLeader(int pktType) | 執行handshake protocal建立follower/observer連接 |
到服務器驗證session有效性
void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOException { LOG.info("Revalidating client: 0x" + Long.toHexString(clientId)); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); dos.writeLong(clientId); dos.writeInt(timeout); dos.close(); QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos.toByteArray(), null); pendingRevalidations.put(clientId, cnxn); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage( LOG, ZooTrace.SESSION_TRACE_MASK, "To validate session 0x" + Long.toHexString(clientId)); } writePacket(qp, true); } void writePacket(QuorumPacket pp, boolean flush) throws IOException { synchronized (leaderOs) { if (pp != null) { messageTracker.trackSent(pp.getType()); leaderOs.writeRecord(pp, "packet"); } if (flush) { bufferedOutput.flush(); } } } void request(Request request) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream oa = new DataOutputStream(baos); oa.writeLong(request.sessionId); oa.writeInt(request.cxid); oa.writeInt(request.type); if (request.request != null) { request.request.rewind(); int len = request.request.remaining(); byte[] b = new byte[len]; request.request.get(b); request.request.rewind(); oa.write(b); } oa.close(); QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo); writePacket(qp, true); } 查找當前的leader信息 protected QuorumServer findLeader() { QuorumServer leaderServer = null; // Find the leader by id Vote current = self.getCurrentVote(); for (QuorumServer s : self.getView().values()) { if (s.id == current.getId()) { // Ensure we have the leader's correct IP address before // attempting to connect. s.recreateSocketAddresses(); leaderServer = s; break; } } if (leaderServer == null) { LOG.warn("Couldn't find the leader with id = " + current.getId()); } return leaderServer; } 連接套接字 sockConnect(Socket sock, InetSocketAddress addr, int timeout) 建立和leader的連接 /** * Establish a connection with the LearnerMaster found by findLearnerMaster. * Followers only connect to Leaders, Observers can connect to any active LearnerMaster. * Retries until either initLimit time has elapsed or 5 tries have happened. * @param addr - the address of the Peer to connect to. * @throws IOException - if the socket connection fails on the 5th attempt * if there is an authentication failure while connecting to leader * @throws X509Exception * @throws InterruptedException */ protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception { this.sock = createSocket(); this.leaderAddr = addr; // leader connection timeout defaults to tickTime * initLimit int connectTimeout = self.tickTime * self.initLimit; // but if connectToLearnerMasterLimit is specified, use that value to calculate // timeout instead of using the initLimit value if (self.connectToLearnerMasterLimit > 0) { connectTimeout = self.tickTime * self.connectToLearnerMasterLimit; } int remainingTimeout; long startNanoTime = nanoTime(); for (int tries = 0; tries < 5; tries++) { try { // recalculate the init limit time because retries sleep for 1000 milliseconds remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1000000); if (remainingTimeout <= 0) { LOG.error("connectToLeader exceeded on retries."); throw new IOException("connectToLeader exceeded on retries."); } sockConnect(sock, addr, Math.min(connectTimeout, remainingTimeout)); if (self.isSslQuorum()) { //開始握手 ((SSLSocket) sock).startHandshake(); } sock.setTcpNoDelay(nodelay); break; } catch (IOException e) { //出現異常 remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1000000); //剩余超時時間 if (remainingTimeout <= 1000) { //打印錯誤日志 LOG.error("Unexpected exception, connectToLeader exceeded. tries=" + tries + ", remaining init limit=" + remainingTimeout + ", connecting to " + addr, e); throw e; //嘗試次數大于4 } else if (tries >= 4) { //打印錯誤日志 LOG.error("Unexpected exception, retries exceeded. tries=" + tries + ", remaining init limit=" + remainingTimeout + ", connecting to " + addr, e); throw e; } else { //發出警告 LOG.warn("Unexpected exception, tries=" + tries + ", remaining init limit=" + remainingTimeout + ", connecting to " + addr, e); //重新嘗試建立socket連接 this.sock = createSocket(); } } //讀取配置延時時間,默認100ns Thread.sleep(leaderConnectDelayDuringRetryMs); } self.authLearner.authenticate(sock, hostname); leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); }
到此,關于“zk中learner的作用是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。