亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spring integration怎么連接MQTT

發布時間:2023-03-11 11:29:35 來源:億速云 閱讀:112 作者:iii 欄目:開發技術

本篇內容主要講解“spring integration怎么連接MQTT”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“spring integration怎么連接MQTT”吧!

MQTT一種物聯網數據傳輸協議,構建在TCP之上,采用發布與訂閱的模式進行數據交互,發布與訂閱是兩個獨立的連接通道,這里采用spring-integration-mqt來實現發布與訂閱MQTT,與直接采用MQTT的SDK相對要簡單許多,服務端采用ActiveMQ來支持MQTT的消息服務并實現消息轉發。

首先需要引入spring-integration-mqt的包

這里只需要引入這一個包即可。

<dependency>
     <groupId>org.springframework.integration</groupId>
     <artifactId>spring-integration-mqtt</artifactId>
     <version>5.3.1.RELEASE</version>
</dependency>

MQTT的配置比較簡單

和spring-integration集成一樣,需要配置相對應的入站、出站就可以了

具體配置如下:

package org.noka.serialservice.config;
 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.noka.serialservice.service.MsgSendService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.support.MessageBuilder;
 
/**--------------------------------------------------------------
 * MQTT 數據轉發服務
 * mqtt.services MQTT服務地址不配置時,不會啟用該服務
 * 檢測mqtt.services這個參數是否配置,以確定是否啟用MQTT服務
 * @author  xiefangjian@163.com
 * @version 1.0.0
 **------------------------------------------------------------*/
@EnableIntegration
@Configuration
@ConditionalOnProperty("mqtt.services")
public class MQTTConfig implements ApplicationListener<ApplicationEvent> {
    private static Logger logger = LoggerFactory.getLogger(MQTTConfig.class);
 
    private final MsgSendService msgSendService;//發布消息到消息中間件接口
 
    @Value("${mqtt.appid:mqtt_id}")
    private String appid;//客戶端ID
 
    @Value("${mqtt.input.topic:mqtt_input_topic}")
    private String[] inputTopic;//訂閱主題,可以是多個主題
 
    @Value("${mqtt.out.topic:mqtt_out_topic}")
    private String[] outTopic;//發布主題,可以是多個主題
 
    @Value("${mqtt.services:#{null}}")
    private String[] mqttServices;//服務器地址以及端口
 
    @Value("${mqtt.user:#{null}}")
    private String user;//用戶名
 
    @Value("${mqtt.password:#{null}}")
    private String password;//密碼
 
    @Value("${mqtt.KeepAliveInterval:300}")
    private Integer KeepAliveInterval;//心跳時間,默認為5分鐘
 
    @Value("${mqtt.CleanSession:false}")
    private Boolean CleanSession;//是否不保持session,默認為session保持
 
    @Value("${mqtt.AutomaticReconnect:true}")
    private Boolean AutomaticReconnect;//是否自動重聯,默認為開啟自動重聯
 
    @Value("${mqtt.CompletionTimeout:30000}")
    private Long CompletionTimeout;//連接超時,默認為30秒
 
    @Value("${mqtt.Qos:1}")
    private Integer Qos;//通信質量,詳見MQTT協議
 
 
    public MQTTConfig(MsgSendService msgSendService) {
        this.msgSendService = msgSendService;
    }
 
    /**
     * MQTT連接配置
     * @return 連接工廠
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();//連接工廠類
        MqttConnectOptions options = new MqttConnectOptions();//連接參數
        options.setServerURIs(mqttServices);//連接地址
        if(null!=user) {
            options.setUserName(user);//用戶名
        }
        if(null!=password) {
            options.setPassword(password.toCharArray());//密碼
        }
        options.setKeepAliveInterval(KeepAliveInterval);//心跳時間
        options.setAutomaticReconnect(AutomaticReconnect);//斷開是否自動重聯
        options.setCleanSession(CleanSession);//保持session
        factory.setConnectionOptions(options);
        return factory;
    }
 
    /**
     * 入站管道
     * @param mqttPahoClientFactory
     * @return
     */
    @Bean
    public MessageProducerSupport mqttInput(MqttPahoClientFactory mqttPahoClientFactory){
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(appid, mqttPahoClientFactory, inputTopic);//建立訂閱連接
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);//bytes類型接收
        adapter.setCompletionTimeout(CompletionTimeout);//連接超時的時間
        adapter.setConverter(converter);
        adapter.setQos(Qos);//消息質量
        adapter.setOutputChannelName(ChannelName.INPUT_DATA);//輸入管道名稱
        return adapter;
    }
    /**
     * 向服務器發送數據管道綁定
     * @param connectionFactory tcp連接工廠類
     * @return 消息管道對象
     */
    @Bean
    @ServiceActivator(inputChannel = ChannelName.OUTPUT_DATA_MQTT)
    public AbstractMqttMessageHandler MQTTOutAdapter(MqttPahoClientFactory connectionFactory) {
        //創建一個新的出站管道,由于MQTT的發布與訂閱是兩個獨立的連接,因此客戶端的ID(即APPID)不能與訂閱時所使用的ID一樣,否則在服務端會認為是同一個客戶端,而造成連接失敗
        MqttPahoMessageHandler outGate = new MqttPahoMessageHandler(appid + "_put", connectionFactory);
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);//bytes類型接收
        outGate.setAsync(true);
        outGate.setCompletionTimeout(CompletionTimeout);//設置連接超時時時
        outGate.setDefaultQos(Qos);//設置通信質量
        outGate.setConverter(converter);
        return outGate;
    }
 
    /**
     * MQTT連接時調用的方法
     * @param event
     */
    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        if (event instanceof MqttSubscribedEvent) {
            String msg = "OK";
            /**------------------連接時需要發送起始消息,寫在這里-------------**/
            msgSendService.send(MessageBuilder.withPayload(msg.getBytes()).build());
        }
    }
}

其中ChanneName是一個常量類

來標識入站、出站管道的名稱,以便在其它需要的地方使用,實現方法如下:

/** -----------------------------------------
 * 管道名稱常量類
 * @author  xiefangjian@163.com
 * @version 1.0.0
 ** ---------------------------------------**/
public class ChannelName {
    public final static String INPUT_DATA="input_data";//入站管道
    public final static String OUTPUT_DATA_TCP="output_data_TCP";//TCP出站管道
    public final static String OUTPUT_DATA_MQTT="output_data_MQTT";//mqtt出站管道名稱
}

此時所有配置完成,接下來需要做的就是處理接收到的數據和發布數據,以上配置完成以后,接收和發送數據都是通過數據管道來完成,配置的是數據管道名稱。

數據發送網關只是一個接口

用于向指定的數據管道里面發送數據,實現如下:

package org.noka.serialservice.service;
 
import org.noka.serialservice.config.ChannelName;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
 
/**----------------------------------------------------------------
 * 發送消息網關,其它需要發向服務器發送消息時,調用該接口
 * @author  xiefangjian@163.com
 * @version  1.0.0
 **--------------------------------------------------------------**/
@MessagingGateway
@Component
public interface MsgGateway {
    /**
     * MQTT 發送網關
     * @param a 主題,可以指定不同的數據發布主題,在消息中間件里面體現為不同的消息隊列
     * @param out 消息內容
     */
    @Gateway(requestChannel = ChannelName.OUTPUT_DATA_MQTT)
    void send(@Header(MqttHeaders.TOPIC) String a, Message<byte[]> out);
}

在需要的地方,可以向下面這樣調用這個接口,向MQTT服務器發送消息

//topic為主題名稱,out為消息內容
msgGateway.send(topic, out);

MQTT服務器有數據下發時

會自動調將數據放入配置的入站數據管道中,在需要接收數據的地方,向下面這樣配置即可

    /**
     * 服務器有數據下發
     * 用ServiceActivator配置需要接收的數據管道名稱,當該管道里面的數據時,會自動調用該方法
     * @param in 服務器有數據下發時,序列化后的對象,這里使用byte數組
     */
    @ServiceActivator(inputChannel = ChannelName.INPUT_DATA)
    public void upCase(Message<byte[]> in) {
        logger.info("[net service data]========================================");
        logger.info("[net dow data]"+new String(in.getPayload()));//字符串方式打印服務器下發的數據
        logger.info("[net dow hex]"+ Hex.encodeHexString(in.getPayload(),false));//16進制方式打印服務器下發的數據
        serialService.send(in.getPayload());//將服務器下發的數據轉發給串口
    }

最后是參數配置文件

#--------MQTT---------------------------
#設備ID,唯一標識
mqtt.appid=mqtt_id
#訂閱主題,多個主題用逗號分隔
mqtt.input.topic=mqtt_input_topic
#發布主題
mqtt.out.topic=mqtt_out_topic,aac
#MQTT服務器地址,可以是多個地址
mqtt.services=tcp://47.244.191.41:1883
#mqtt用戶名,默認無
#mqtt.user=guest
#mqtt密碼,默認無
#mqtt.password=guest
#心跳間隔時間,默認3000
#mqtt.KeepAliveInterval=3000
#是否不保持session,默認false
#mqtt.CleanSession=false
#是否自動連接,默認true
#mqtt.AutomaticReconnect=true
#連接超時,默認30000
#mqtt.CompletionTimeout=30000
#傳輸質量,默認1
#mqtt.Qos=1

到此,相信大家對“spring integration怎么連接MQTT”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

河南省| 兴仁县| 和硕县| 明水县| 阿克| 万年县| 三门县| 拜城县| 理塘县| 呼和浩特市| 六枝特区| 浏阳市| 四会市| 临夏县| 呈贡县| 雷山县| 和田县| 资源县| 罗平县| 鄂州市| 咸丰县| 邯郸市| 西盟| 南溪县| 东至县| 呼伦贝尔市| 富裕县| 阳城县| 比如县| 扬中市| 息烽县| 平湖市| 武隆县| 中宁县| 镇赉县| 金昌市| 郎溪县| 修文县| 扎赉特旗| 合江县| 彭阳县|