亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》
  • 首頁 > 
  • 教程 > 
  • 開發技術 > 
  • mqtt協議-broker之moqutte源碼研究五之UNSUBSCRIBE與DISCONN報文處理

mqtt協議-broker之moqutte源碼研究五之UNSUBSCRIBE與DISCONN報文處理

發布時間:2020-07-28 07:41:51 來源:網絡 閱讀:3346 作者:xingyuntian 欄目:開發技術

本文講解moquette對UNSUBSCRIBE和DISCONNECT的處理

先說UNSUBSCRIBE,代碼比較簡單

    public void processUnsubscribe(Channel channel, MqttUnsubscribeMessage msg) {
    List<String> topics = msg.payload().topics();
    String clientID = NettyUtils.clientID(channel);

    LOG.info("Processing UNSUBSCRIBE message. CId={}, topics={}", clientID, topics);

    ClientSession clientSession = m_sessionsStore.sessionForClient(clientID);
    for (String t : topics) {
        Topic topic = new Topic(t);
        boolean validTopic = topic.isValid();
        if (!validTopic) {
            // close the connection, not valid topicFilter is a protocol violation
            channel.close();
            LOG.error("Topic filter is not valid. CId={}, topics={}, badTopicFilter={}", clientID, topics, topic);
            return;
        }
        if(LOG.isDebugEnabled()){
            LOG.debug("Removing subscription. CId={}, topic={}", clientID, topic);
        }
        subscriptions.removeSubscription(topic, clientID);
        clientSession.unsubscribeFrom(topic);
        String username = NettyUtils.userName(channel);
        m_interceptor.notifyTopicUnsubscribed(topic.toString(), clientID, username);
    }

    // ack the client
    int messageID = msg.variableHeader().messageId();
    MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, AT_LEAST_ONCE, false, 0);
    MqttUnsubAckMessage ackMessage = new MqttUnsubAckMessage(fixedHeader, from(messageID));

    LOG.info("Sending UNSUBACK message. CId={}, topics={}, messageId={}", clientID, topics, messageID);
    channel.writeAndFlush(ackMessage);
}

主要分為以下幾步
1.從目錄樹下,移除該client的訂閱,這個移除過程有點復雜,后面單獨一篇專門講解topic樹
2.清除ClientSession里面的訂閱,包括Set<Subscription> subscriptions,同時還得移除ISubscriptionsStore里面的Map<Topic, Subscription> subscriptions
3.喚醒攔截器
4.返回UNSUBACK ,這里注意UNSUBACK 是沒有payload的。

再說DISCONNECT的處理

public void processDisconnect(Channel channel) throws InterruptedException {
    final String clientID = NettyUtils.clientID(channel);
    LOG.info("Processing DISCONNECT message. CId={}", clientID);
    channel.flush();
    final ConnectionDescriptor existingDescriptor = this.connectionDescriptors.getConnection(clientID);
    if (existingDescriptor == null) {
        // another client with same ID removed the descriptor, we must exit
        channel.close();
        return;
    }

    if (existingDescriptor.doesNotUseChannel(channel)) {
        // another client saved it's descriptor, exit
        LOG.warn("Another client is using the connection descriptor. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!removeSubscriptions(existingDescriptor, clientID)) {
        LOG.warn("Unable to remove subscriptions. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!dropStoredMessages(existingDescriptor, clientID)) {
        LOG.warn("Unable to drop stored messages. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!cleanWillMessageAndNotifyInterceptor(existingDescriptor, clientID)) {
        LOG.warn("Unable to drop will message. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!existingDescriptor.close()) {
        LOG.info("The connection has been closed. CId={}", clientID);
        return;
    }

    boolean stillPresent = this.connectionDescriptors.removeConnection(existingDescriptor);
    if (!stillPresent) {
        // another descriptor was inserted
        LOG.warn("Another descriptor has been inserted. CId={}", clientID);
        return;
    }

    LOG.info("The DISCONNECT message has been processed. CId={}", clientID);
}

1.檢查連接描述符是否還存在,如果不存在,說明之前已經有客戶端刪除了它,直接關閉通道
2.判斷這個client的連接描述符是不是,是不是還是當前使用這個通道的client?作者要先防止這種情況呢?先賣個關子,后面的第6條會說明
3.清除訂閱請求,這里面好像只清楚了不要求保存會話信息的clientsession里面的ISessionsStore里面的Map<Topic, Subscription> subscriptions,而并沒有清除ClientSession里面的Set<Subscription> subscriptions和topic樹里面的訂閱,這能夠解釋https://blog.51cto.com/13579730/2073914 這篇文章結尾討論的問題了,只有Map<Topic, Subscription> subscriptions的訂閱才是最準確的。
4.丟棄存儲的消息,這里面也只是會丟棄不要去保存會話信息的消息
5.清除遺愿消息,對于遺愿消息,這里稍微啰嗦一點,遺愿消息是在初次連接的存儲到ProtocolProcessor的ConcurrentMap<String, WillMessage> m_willStore這里面的,那么什么時候發送給訂閱者呢?看下面

    io.moquette.server.netty.NettyMQTTHandler#channelInactive
    @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    String clientID = NettyUtils.clientID(ctx.channel());
    if (clientID != null && !clientID.isEmpty()) {
        LOG.info("Notifying connection lost event. MqttClientId = {}.", clientID);
        m_processor.processConnectionLost(clientID, ctx.channel());
    }
    ctx.close();
}
    說明是當netty檢測到通道不活躍的時候通知ProtocolProcessor處理ConnectionLost事件的。
    public void processConnectionLost(String clientID, Channel channel) {
    LOG.info("Processing connection lost event. CId={}", clientID);
    ConnectionDescriptor oldConnDescr = new ConnectionDescriptor(clientID, channel, true);
    connectionDescriptors.removeConnection(oldConnDescr);//移除連接描述符
    // publish the Will message (if any) for the clientID
    if (m_willStore.containsKey(clientID)) {
        WillMessage will = m_willStore.get(clientID);
        forwardPublishWill(will, clientID);//發布遺愿消息
        m_willStore.remove(clientID);//移除遺愿消息存儲
    }

    String username = NettyUtils.userName(channel);
    m_interceptor.notifyClientConnectionLost(clientID, username);//喚醒攔截器
}
    在以下這種情況下會發布遺愿消息
    遺囑消息發布的條件,包括但不限于:
    服務端檢測到了一個I/O錯誤或者網絡故障。
    客戶端在保持連接(Keep Alive)的時間內未能通訊。
    客戶端沒有先發送DISCONNECT報文直接關閉了網絡連接。
    由于協議錯誤服務端關閉了網絡連接。

    另外說明一下,遺愿消息是可以設置消息等級的,而且可以被設置成retain消息

6.連接描述符集合里面清除該通道對應的連接描述符,這里有一點很容易誤解,強調一下

    boolean stillPresent = this.connectionDescriptors.removeConnection(existingDescriptor);
    if (!stillPresent) {
        // another descriptor was inserted
        LOG.warn("Another descriptor has been inserted. CId={}", clientID);
        return;
    }

    作者調用的是ConcurrentMap里面的boolean remove(Object key, Object value);這個方法要求key存在,且value 與預期的一樣才會刪除,也就說,是有可能存在的,key一樣而value不一樣的情況的,什么時候會出現?答案是client在兩個設備上先后登陸,這個時候由于是存在一個map里面的所以后面的登陸所創建的連接描述符會覆蓋前面的一個。當然這里面,也可以在覆蓋之前強制斷開之前那個連接,但是moquette并沒有這么做,具體看源碼io.moquette.server.ConnectionDescriptorStore#addConnection

也就說說moquette是允許存在一個賬號多設備登陸的。將入client先后在A,B兩個設備上建立連接,B連接會覆蓋A連接,這個時候A連接雖然還在,但其實是永遠也收不到消息的,因為發送消息的時候,會以ConnectionDescriptorStore里面存儲的為準,具體看源碼
io.moquette.server.ConnectionDescriptorStore#sendMessage,也就是說A連接會無謂的占用broker的資源,個人覺得這樣并不好,也非常沒有必要,大家可以自行改進。
現在大家就能夠理解上面的第2步了,因為這個就是為雙登陸的情況下,被覆蓋的那個連接準備的。

moquette-broker還要處理以下的報文,包括
1.PINGREQ,心跳報文 
2.PUBACK,當broker向client發送qos1消息的時候,client需要回復PUBACK消息,消息存儲在
io.moquette.spi.ClientSession.OutboundFlightZone outboundFlightZone里面(底層使用map存儲的),
消息是io.moquette.spi.impl.MessagesPublisher#publish3Subscribers(io.moquette.spi.IMessagesStore.StoredMessage, io.moquette.spi.impl.subscriptions.Topic)
這里被存儲進去的,這是一個臨時的存儲,存儲完之后消息會被刪除掉
3.PUBREC 這個是當broker向client發送qos2消息之后,client需要向broker作的第一個返回報文,
這里面有個動作是將消息從inboundFlightMessages轉移到secondPhaseStore和outboundFlightMessages,具體看這
里io.moquette.persistence.memory.MemorySessionStore#moveInFlightToSecondPhaseAckWaiting
4.PUBCOMP 當broker收到這個報文的時候會負責從內存里面刪除飛行窗口的消息,具體怎么刪除的詳見下篇,moquette攔截器
5.PUBREL。當client向broker發送qos2消息的時候,broker會回復PUBREC,告訴client已經記錄下來了,
client收到PUBREC之后會發送PUBREL,告訴broker,我知道你已經記錄了消息,既然你記錄了,那這邊
就釋放消息了(確保只要broker才會該消息,避免client重發),當broker收到PUBREL報文的時候,就知道
client那邊已經把該消息釋放了,然后消息的主導權到了他這邊,他開始發送消息。當消息發送完成了,
會向client發送PUBCOMP報文。
關于qos2消息的介紹可以看一下這里https://github.com/mcxiaoke/mqtt/blob/master/mqtt/04-OperationalBehavior.md 的4.3條
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

锦屏县| 镇巴县| 七台河市| 凤城市| 鲁山县| 德阳市| 西昌市| 桓台县| 大安市| 万安县| 邢台县| 红河县| 上高县| 常德市| 阳城县| 阆中市| 远安县| 永清县| 阿鲁科尔沁旗| 如皋市| 宁津县| 留坝县| 塔河县| 贵南县| 靖宇县| 和龙市| 金华市| 霍邱县| 鹤峰县| 临沧市| 门头沟区| 长汀县| 枣庄市| 德安县| 滁州市| 万安县| 蕉岭县| 黔西县| 东兰县| 阳原县| 威海市|