亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎樣建立連接Zookeeper中的服務端

發布時間:2021-09-13 11:55:30 來源:億速云 閱讀:203 作者:柒染 欄目:大數據

這篇文章給大家介紹Zookeeper之怎樣建立連接服務端,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

服務端處理請求的代碼有兩種NIOServerCnxnFactory和NettyServerCnxnFactory,默認是NIOServerCnxnFactory,可以通過指定zookeeper.serverCnxnFactory參數來修改。

這兩個類邏輯是一樣的,只是一個用的java原生的NIO,一個用的netty,這里我們就分析下NIOServerCnxnFactory。

NIOServerCnxnFactory實現了Runnable接口,看下它的run方法,循環處理請求

//NIOServerCnxnFactory.java
//第200行
public void run() {
    while (!ss.socket().isClosed()) {
        try {
            selector.select(1000);
            Set<SelectionKey> selected;
            synchronized (this) {
                selected = selector.selectedKeys();
            }
            ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
                selected);
            Collections.shuffle(selectedList);
            for (SelectionKey k : selectedList) {
                //如果是連接請求
                if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                    SocketChannel sc = ((ServerSocketChannel) k
                                        .channel()).accept();
                    InetAddress ia = sc.socket().getInetAddress();
                    //獲取IP地址對應的客戶端連接數
                    int cnxncount = getClientCnxnCount(ia);
                    //如果超出則關閉
                    if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
                        LOG.warn("Too many connections from " + ia
                                 + " - max is " + maxClientCnxns );
                        sc.close();
                    } else {
                        LOG.info("Accepted socket connection from "
                                 + sc.socket().getRemoteSocketAddress());
                        sc.configureBlocking(false);
                        SelectionKey sk = sc.register(selector,
                                                      SelectionKey.OP_READ);
                        //每一個連接都是一個NIOServerCnxn
                        NIOServerCnxn cnxn = createConnection(sc, sk);
                        sk.attach(cnxn);
                        addCnxn(cnxn);
                    }
                } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                    //在第二個循環的時候,會進入這里,處理真正的連接請求
                    NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                    c.doIO(k);
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Unexpected ops in select "
                                  + k.readyOps());
                    }
                }
            }
            selected.clear();
        } catch (RuntimeException e) {
            LOG.warn("Ignoring unexpected runtime exception", e);
        } catch (Exception e) {
            LOG.warn("Ignoring exception", e);
        }
    }
    closeAll();
    LOG.info("NIOServerCnxn factory exited run method");
}

//NIOServerCnxn.java
//第237行
void doIO(SelectionKey k) throws InterruptedException {
    try {
        if (isSocketOpen() == false) {
            LOG.warn("trying to do i/o on a null socket for session:0x"
                     + Long.toHexString(sessionId));

            return;
        }
        if (k.isReadable()) {
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException(
                    "Unable to read additional data from client sessionid 0x"
                    + Long.toHexString(sessionId)
                    + ", likely client has closed socket");
            }
            if (incomingBuffer.remaining() == 0) {
                boolean isPayload;
                if (incomingBuffer == lenBuffer) { // start of next request
                    incomingBuffer.flip();
                    isPayload = readLength(k);
                    incomingBuffer.clear();
                } else {
                    // continuation
                    isPayload = true;
                }
                if (isPayload) { // not the case for 4letterword
                    readPayload();
                }
                else {
                    // four letter words take care
                    // need not do anything else
                    return;
                }
            }
        }
        //省略部分代碼
    } catch (CancelledKeyException e) {
        
    } catch (CloseRequestException e) {
        
    } catch (EndOfStreamException e) {
        
    } catch (IOException e) {
        
    }
}

//NIOServerCnxn.java
//第194行
private void readPayload() throws IOException, InterruptedException {
    if (incomingBuffer.remaining() != 0) { // have we read length bytes?
        int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
        if (rc < 0) {
            throw new EndOfStreamException(
                "Unable to read additional data from client sessionid 0x"
                + Long.toHexString(sessionId)
                + ", likely client has closed socket");
        }
    }

    if (incomingBuffer.remaining() == 0) { // have we read length bytes?
        packetReceived();
        incomingBuffer.flip();
        if (!initialized) {
            readConnectRequest();
        } else {
            readRequest();
        }
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }
}

//NIOServerCnxn.java
//第434行
private void readConnectRequest() throws IOException, InterruptedException {
    if (!isZKServerRunning()) {
        throw new IOException("ZooKeeperServer not running");
    }
    zkServer.processConnectRequest(this, incomingBuffer);
    initialized = true;
}

//ZookeeperServer.java
//第886行
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
    BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
    ConnectRequest connReq = new ConnectRequest();
    connReq.deserialize(bia, "connect");
    if (LOG.isDebugEnabled()) {
        LOG.debug("Session establishment request from client "
                  + cnxn.getRemoteSocketAddress()
                  + " client's lastZxid is 0x"
                  + Long.toHexString(connReq.getLastZxidSeen()));
    }
    boolean readOnly = false;
    try {
        readOnly = bia.readBool("readOnly");
        cnxn.isOldClient = false;
    } catch (IOException e) {
        // this is ok -- just a packet from an old client which
        // doesn't contain readOnly field
        LOG.warn("Connection request from old client "
                 + cnxn.getRemoteSocketAddress()
                 + "; will be dropped if server is in r-o mode");
    }
    //如果客戶端沒有設置readOnly,但是服務端是只讀的,直接拋出異常關閉連接
    if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) {
        String msg = "Refusing session request for not-read-only client "
            + cnxn.getRemoteSocketAddress();
        LOG.info(msg);
        throw new CloseRequestException(msg);
    }
    if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
        String msg = "Refusing session request for client "
            + cnxn.getRemoteSocketAddress()
            + " as it has seen zxid 0x"
            + Long.toHexString(connReq.getLastZxidSeen())
            + " our last zxid is 0x"
            + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
            + " client must try another server";

        LOG.info(msg);
        throw new CloseRequestException(msg);
    }
    //協商session超時時間
    int sessionTimeout = connReq.getTimeOut();
    byte passwd[] = connReq.getPasswd();
    int minSessionTimeout = getMinSessionTimeout();
    if (sessionTimeout < minSessionTimeout) {
        sessionTimeout = minSessionTimeout;
    }
    int maxSessionTimeout = getMaxSessionTimeout();
    if (sessionTimeout > maxSessionTimeout) {
        sessionTimeout = maxSessionTimeout;
    }
    cnxn.setSessionTimeout(sessionTimeout);
    // We don't want to receive any packets until we are sure that the
    // session is setup
    cnxn.disableRecv();
    long sessionId = connReq.getSessionId();
    if (sessionId != 0) {
        //如果sessionId不是0,說明是之前已經連接過的客戶端因為掉線等原因重新連接的情況
        long clientSessionId = connReq.getSessionId();
        LOG.info("Client attempting to renew session 0x"
                 + Long.toHexString(clientSessionId)
                 + " at " + cnxn.getRemoteSocketAddress());
        serverCnxnFactory.closeSession(sessionId);
        cnxn.setSessionId(sessionId);
        reopenSession(cnxn, sessionId, passwd, sessionTimeout);
    } else {
        LOG.info("Client attempting to establish new session at "
                 + cnxn.getRemoteSocketAddress());
        createSession(cnxn, passwd, sessionTimeout);
    }
}

//ZookeeperServer.java
//第617行
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
    //創建一個session,zookeeper的session管理比較復雜,具體情況下一章分析
    long sessionId = sessionTracker.createSession(timeout);
    Random r = new Random(sessionId ^ superSecret);
    r.nextBytes(passwd);
    ByteBuffer to = ByteBuffer.allocate(4);
    to.putInt(timeout);
    cnxn.setSessionId(sessionId);
    //響應客戶端
    submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
    return sessionId;
}

//ZookeeperServer.java
//第728行
public void submitRequest(Request si) {
    //省略部分代碼
    
    try {
        //刷新session的超時時間
        touch(si.cnxn);
        boolean validpacket = Request.isValid(si.type);
        if (validpacket) {
            //提交給PrepRequestProcessor進一步處理
            firstProcessor.processRequest(si);
            if (si.cnxn != null) {
                incInProcess();
            }
        } else {
            LOG.warn("Received packet at server of unknown type " + si.type);
            new UnimplementedRequestProcessor().processRequest(si);
        }
    } catch (MissingSessionException e) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Dropping request: " + e.getMessage());
        }
    } catch (RequestProcessorException e) {
        LOG.error("Unable to process request:" + e.getMessage(), e);
    }
}

//PrepRequestProcessor.java
//第294行
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
            throws KeeperException, IOException, RequestProcessorException {
    request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
                                Time.currentWallTime(), type);

    switch (type) {
        //省略部分代碼
        
        case OpCode.createSession:
            request.request.rewind();
            int to = request.request.getInt();
            request.txn = new CreateSessionTxn(to);
            request.request.rewind();
            //這里又調用了一次addSession,但是之前的代碼其實已經新增過了,不太明白為什么
            zks.sessionTracker.addSession(request.sessionId, to);
            zks.setOwner(request.sessionId, request.getOwner());
            break;
            
        //省略部分代碼
            
        default:
            LOG.error("Invalid OpCode: {} received by PrepRequestProcessor", type);
    }
}

關于Zookeeper之怎樣建立連接服務端就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

柳林县| 南木林县| 略阳县| 太仓市| 蒲江县| 达日县| 靖州| 麻江县| 洪雅县| 康平县| 阳高县| 黑山县| 金沙县| 莱阳市| 琼中| 青田县| 新竹县| 鄢陵县| 昌宁县| 宁德市| 九龙县| 高碑店市| 亳州市| 昭平县| 上林县| 渭南市| 靖边县| 阿克| 临漳县| 普兰店市| 平乡县| 嵩明县| 许昌市| 嘉善县| 伽师县| 商洛市| 上蔡县| 苏州市| 景东| 明溪县| 彰化市|