亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

mqtt協議-broker之moqutte源碼研究二之Connect報文處理

發布時間:2020-06-29 10:43:46 來源:網絡 閱讀:7563 作者:xingyuntian 欄目:開發技術

先上一個圖,大概說明一下moquette 的類之間的關系
mqtt協議-broker之moqutte源碼研究二之Connect報文處理

一.ProtocolProcessor類
該類是moquette里面的最終要的類,負責所有報文的處理,持有所有各模塊功能的實現對象的引用, 下面詳細介紹

    protected ConnectionDescriptorStore connectionDescriptors;//所有的連接描述符文存儲,即clientId與通道之間的映射集合
protected ConcurrentMap<RunningSubscription, SubscriptionState> subscriptionInCourse;//所有當前正在處理的
    訂閱關系的存儲,之所以有這個是過濾無效的訂閱請求
private SubscriptionsDirectory subscriptions;//訂閱目錄,本質上是topic樹
private ISubscriptionsStore subscriptionStore;//所有的訂閱的集合
private boolean allowAnonymous;//是否允許匿名連接
private boolean allowZeroByteClientId;//是否允許clientId為空
private IAuthorizator m_authorizator; //對topic的讀寫權限認證

private IMessagesStore m_messagesStore;//retainMessage的存儲

private ISessionsStore m_sessionsStore;//session 存儲

private IAuthenticator m_authenticator;//連接時候的鑒權認證
private BrokerInterceptor m_interceptor;//各個層面的攔截器

private Qos0PublishHandler qos0PublishHandler;//qos0攔截器
private Qos1PublishHandler qos1PublishHandler;//qos1攔截器
private Qos2PublishHandler qos2PublishHandler;/qos2攔截器
private MessagesPublisher messagesPublisher;//分發消息,遺愿消息,以及集權間同步消息
private InternalRepublisher internalRepublisher;//保留消息,qos1,qos2消息重發器
    ConcurrentMap<String, WillMessage> m_willStore//遺愿消息存儲

    幾乎所有的功能的源頭都在這個類里面

二.對14種報文的處理,都在ProtocolProcessor類,后面會分篇挨個講解moquette對這14個報文的處理
具體哪14中文報文如下

名字                   值           報文流動方向                      描述

Reserved 0 禁止 保留
CONNECT 1 客戶端到服務端 客戶端請求連接服務端
CONNACK 2 服務端到客戶端 連接報文確認
PUBLISH 3 兩個方向都允許 發布消息
PUBACK 4 兩個方向都允許 QoS 1消息發布收到確認
PUBREC 5 兩個方向都允許 發布收到(保證交付第一步)
PUBREL 6 兩個方向都允許 發布釋放(保證交付第二步)
PUBCOMP 7 兩個方向都允許 QoS 2消息發布完成(保證交互第三步)
SUBSCRIBE 8 客戶端到服務端 客戶端訂閱請求
SUBACK 9 服務端到客戶端 訂閱請求報文確認
UNSUBSCRIBE 10 客戶端到服務端 客戶端取消訂閱請求
UNSUBACK 11 服務端到客戶端 取消訂閱報文確認
PINGREQ 12 客戶端到服務端 心跳請求
PINGRESP 13 服務端到客戶端 心跳響應
DISCONNECT 14 客戶端到服務端 客戶端斷開連接
Reserved 15 禁止 保留

或者到這里看更詳細的mqtt中文翻譯
https://github.com/mcxiaoke/mqtt/blob/master/mqtt/02-ControlPacketFormat.md
非常感謝作者的辛勞工作和無私分享

三.debug跟蹤moquette 對CONNECT報文的處理
大概分為以下幾步
1.驗證協議版本,如果不是mqtt-3.1或者mqtt-3.1.1則拒絕連接
2.驗證clientId是否為空,如果為空,但是配置的時候(在上篇介紹的moquette.cof里面配置)要求不允許唯恐,即上面的allowZeroByteClientId或者cleanSession為false即要求保存會話,則視為不合法,拒絕連接,否則由moquette生成clientId
3.驗證是否有登錄的權限
這里面貼上源碼講解一下
private boolean login(Channel channel, MqttConnectMessage msg, final String clientId) {
// handle user authentication
if (msg.variableHeader().hasUserName()) {
byte[] pwd = null;
if (msg.variableHeader().hasPassword()) {
pwd = msg.payload().passwordInBytes();
} else if (!this.allowAnonymous) {
LOG.error("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", clientId);
failedCredentials(channel);
return false;
}
if (!m_authenticator.checkValid(clientId, msg.payload().userName(), pwd)) {
LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}, password={}",
clientId, msg.payload().userName(), pwd);
failedCredentials(channel);
return false;
}
NettyUtils.userName(channel, msg.payload().userName());
} else if (!this.allowAnonymous) {
LOG.error("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
failedCredentials(channel);
return false;
}
return true;
}

3.1.如果CONNETCT報文里面的可變頭里面沒有用戶名,直接返回true
3.2.如果有用戶名,同時有密碼,從可變頭取出密碼,調用m_authenticator進行驗證
3.3 如果有用戶名,沒有密碼,認證失敗,拒絕連接
3.4 如果沒有用戶名,同時配置為不允許匿名,則認證失敗

4.創建連接描述符,連接描述符包括clientId,channel,isCleanSession,ConnectState,同時判斷連接描述符集合里面是否包括該連接描述符,如果包含,代表該連接以及建立,斷開連接
5.根據CONNECT報文里面的Keep Alive time 來設置tcp參數
6.根據CONNECT報文遺愿消息標志位,覺得是否存儲遺愿消息
7.返回CONNACK報文,這里面把返回CONNACK報文單獨講解一下

        private boolean sendAck(ConnectionDescriptor descriptor, MqttConnectMessage msg, final String clientId) {
    LOG.info("Sending connect ACK. CId={}", clientId);
    final boolean success = descriptor.assignState(DISCONNECTED, SENDACK);
    if (!success) {
        return false;
    }

    MqttConnAckMessage okResp;
    ClientSession clientSession = m_sessionsStore.sessionForClient(clientId);
    boolean isSessionAlreadyStored = clientSession != null;
    if (!msg.variableHeader().isCleanSession() && isSessionAlreadyStored) {
        okResp = connAckWithSessionPresent(CONNECTION_ACCEPTED);
    } else {
        okResp = connAck(CONNECTION_ACCEPTED);
    }

    if (isSessionAlreadyStored) {
        LOG.info("Cleaning session. CId={}", clientId);
        clientSession.cleanSession(msg.variableHeader().isCleanSession());
    }
    descriptor.writeAndFlush(okResp);
    LOG.info("The connect ACK has been sent. CId={}", clientId);
    return true;
}

        7.1 判斷當前連接的狀態,怎么判斷的呢?這里面用了AtomicReference<ConnectionState>通過調用原子引用類  compareAndSet(DISCONNECTED, SENDACK)來解決并發修改連接狀態的問題。
        7.2如果狀態是disConnect,將狀態修改為sendAck
        7.3 如果CONNETCT報文里面的CleanSession標識設置為0同時broker已經有了client的會話,將CONNACK報文里面的連接確認標志設為1,告訴客戶端,broker已經有了響應的會話信息。否則將連接確認標志設為0
        7.4 如果已經存在相應的client的會話,則根據新的連接,更新clientSession里面的是否清理session屬性

8.喚醒攔截器記錄連接事件
9.創建或者從新加載clientSession,這里面單獨講解一下

            private ClientSession createOrLoadClientSession(ConnectionDescriptor descriptor, MqttConnectMessage msg,
        String clientId) {
    final boolean success = descriptor.assignState(SENDACK, SESSION_CREATED);
    if (!success) {
        return null;
    }

    ClientSession clientSession = m_sessionsStore.sessionForClient(clientId);
    boolean isSessionAlreadyStored = clientSession != null;
    if (!isSessionAlreadyStored) {
        clientSession = m_sessionsStore.createNewSession(clientId, msg.variableHeader().isCleanSession());
    }
    if (msg.variableHeader().isCleanSession()) {
        LOG.info("Cleaning session. CId={}", clientId);
        clientSession.cleanSession();
    }
    return clientSession;
}

     9.1 AtomicReference<ConnectionState>通過調用原子引用類  compareAndSet(SENDACK, SESSION_CREATED)將連接狀態從sendAck修改為session_create
     9.2 session存儲結合里面,是否已經存在會話信息,如果不存在,創建一個新的clientsession
     9.3 如果存在,根據CONNETCT報文里面的cleansession自動決定是否清理調舊的會話信息。

10.如果CONNETCT報文要求不清理會話信息(cleansession標志位為0),則重發QoS1 and QoS2 messages,同時將連接狀態從session_create修改成message_republish
11.將連接狀態從session_create修改成established

到此,broker和client直接的mqtt連接正式建立,后面client可以開始發送SUBSCRIBE或者PUBLISH報文了。
在這里再補充一點,對于broker來說,建立連接的過程中,連接狀態會從disConnect->sendAck->session_create->message_republish->established,之所以要設置這些狀態,是因為,每一步后面的操作都要基于前面的狀態來決定是否需要真正執行,這里面用到了原子引用類來保證,狀態的修改這個操作的原子行,確保了在并發的情況下,每一步操作都是條件滿足的。

下面一篇將會講解SUBSCRIBE報文的處理

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

新闻| 和政县| 印江| 青河县| 新营市| 沅陵县| 静安区| 揭西县| 盘锦市| 原阳县| 芒康县| 西乌珠穆沁旗| 广德县| 安陆市| 奎屯市| 大名县| 确山县| 九江县| 青岛市| 忻城县| 仁布县| 紫金县| 桓台县| 洮南市| 栖霞市| 运城市| 三原县| 佛山市| 通化市| 边坝县| 沁水县| 紫阳县| 元阳县| 浮梁县| 青神县| 讷河市| 确山县| 遵义县| 太保市| 鄂伦春自治旗| 鹤庆县|