SpringBoot集成Mqtt发送消息

04-23 7786阅读 0评论

1. MQTT简介

MQTT是一种物联网消息协议,为Message Queuing Telemetry Transport的缩写,即消息队列传输探测,协议基于发布订阅模式进行通信,有开销低、带宽小、轻量的特点,通常应用在物联网数据采集、移动应用、智能硬件、电力、能源等领域。

相关概念

三种身份:

SpringBoot集成Mqtt发送消息 第1张

  • 客户端(Client):MQTT 客户端是发送和接收消息的应用程序。
  • 服务器(Broker):也叫“代理”,服务器是处理消息的应用程序,位于发布者和订阅者中间,负责接收消息,并按照某种规则发送给订阅者。
  • 主题(Topic): 主题是消息的标识符,用于区分不同类型的消息。

    MQTT 消息

    MQTT传输的消息可以分为:主题(topic)和负载(payload)两部分

    • 主题,可以理解为消息的类型
    • 负载,可以理解为消息的内容

      消息服务质量QoS(Quality of Service)

      Qos用于保证在不同的网络环境下消息传递的可靠性,分为3个等级

      • 0 消息最多传递一次,消息发布完全依赖底层TCP/IP网络,可能会发生消息丢失, 也就是发出去就不管了,也被叫做“即发即弃”
      • 1 消息传递至少 1 次,确保消息到达,但消息重复可能会发生,发送者将会存储发送的信息直到发送者收到一次来自接收者的PUBACK格式的应答。
      • 2 消息仅传送一次,确保消息到达一次

        2. SpringBoot集成Mqtt

        Spring集成Mqtt常用的有两种方式,一种是直接使用Mqtt的客户端库,如Eclipse Paho,另外一种是spring integration mqtt

        第一种:使用Mqtt客户端库

        依赖引入:org.eclipse.paho.client.mqttv3

        	org.eclipse.paho
        	org.eclipse.paho.client.mqttv3
        	1.2.0
        
        

        服务端配置

        public class MqttSendMsgService {
            private static String clientId = "test";
            private static String username = "admin";
            private static String password = "xxxxxx";
            private static String broker = "tcp://xxxxx:1883";
            public ReturnT mqttSend(String param) {
                MqttClient client;
                try {
                    client = new MqttClient(broker, clientId, new MemoryPersistence());
                    client.setCallback(new MqttCallback() {
                        public void connectionLost(Throwable cause) {
                            System.out.println("Connection lost: " + cause.getMessage());
                        }
                        @Override
                        public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                            System.out.println("Message arrived: " + mqttMessage.getPayload());
                        }
                        @Override
                        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                            System.out.println("Delivery complete");
                        }
                    });
                    MqttConnectOptions connOpts = new MqttConnectOptions();
                    connOpts.setUserName(username);
                    connOpts.setPassword(password.toCharArray());
                    client.connect(connOpts);
                    log.info("Connected to MQTT Broker!");
                    //主题
                    String topic="test/simple";
                    //消息
                    String content="发送测试";
                    MqttMessage message = new MqttMessage();
                    message.setQos(1);
                    message.setRetained(false);
                    message.setPayload(content.getBytes());
                    //消息发送
                    client.publish(topic,message);
                } catch (MqttException e) {
                    e.printStackTrace();
                }
                return ReturnT.SUCCESS;
            }
        }
        

        上面这种使用起来比较简单,生产环境使用最多的还是下面这种

        第二种:使用 Spring integration进行集成,这里以发送消息为例

        依赖引入

        	org.springframework.integration
        	spring-integration-mqtt
        	5.5.14
        
        

        添加yaml配置

        mqtt.url = tcp://xxxxx:1883
        mqtt.username = admin
        mqtt.password = 123456
        mqtt.clientId = test
        mqtt.defaultTopic = /test/send
        mqtt.keepAliveInterval = 60
        mqtt.automaticReconnect = true
        mqtt.cleanSession = false
        mqtt.connectionTimeout = 30
        mqtt.maxInflight = 1024
        

        添加对应的属性配置类

        @Component
        public class MqttConfigProperties {
            @Value("${mqtt.url}")
            private String url;
            @Value("${mqtt.username}")
            private String username;
            @Value("${mqtt.password}")
            private String password;
            @Value("${mqtt.clientId}")
            private String clientId;
            @Value("${mqtt.defaultTopic}")
            private String defaultTopic;
            @Value("${mqtt.keepAliveInterval}")
            private Integer keepAliveInterval;
            @Value("${mqtt.automaticReconnect}")
            private Boolean automaticReconnect;
            @Value("${mqtt.cleanSession}")
            private Boolean cleanSession;
            @Value("${mqtt.connectionTimeout}")
            private Integer connectionTimeout;
            @Value("${mqtt.maxInflight}")
            private Integer maxInflight;
        }
        

        创建客户端配置类

        @Configuration
        @IntegrationComponentScan
        public class MqttConfig {
            @Autowired
            private MqttConfigProperties mqttConfigProperties;
            @Bean
            public MqttConnectOptions mqttConnectOptions() {
                log.info("初始化mqtt信息{}", JSON.toJSON(mqttConfigProperties));
                MqttConnectOptions options = new MqttConnectOptions();
                options.setUserName(mqttConfigProperties.getUsername());
                options.setPassword(mqttConfigProperties.getPassword().toCharArray());
                options.setServerURIs(new String[]{mqttConfigProperties.getUrl()});
                options.setKeepAliveInterval(mqttConfigProperties.getKeepAliveInterval());
                options.setAutomaticReconnect(mqttConfigProperties.getAutomaticReconnect());
                options.setCleanSession(mqttConfigProperties.getCleanSession());
                options.setConnectionTimeout(mqttConfigProperties.getConnectionTimeout());
                options.setMaxInflight(mqttConfigProperties.getMaxInflight());
                return options;
            }
            @Bean
            public MqttPahoClientFactory mqttPahoClientFactory(MqttConnectOptions mqttConnectOptions) {
                DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
                factory.setConnectionOptions(mqttConnectOptions);
                return factory;
            }
            // 推送通道
            @Bean
            public MessageChannel mqttOutputChannel() {
                return new DirectChannel();
            }
            @Bean
            @ServiceActivator(inputChannel = "mqttOutputChannel")
            public MessageHandler sendHandler(MqttPahoClientFactory mqttPahoClientFactory) {
                MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfigProperties.getClientId() + "-publish", mqttPahoClientFactory);
                messageHandler.setAsync(true);
                messageHandler.setDefaultQos(1);
                messageHandler.setDefaultTopic(mqttConfigProperties.getDefaultTopic());
                log.info("初始化mqttOutputChannel...");
                return messageHandler;
            }
        }
        

        发送网关接口

        @MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
        public interface MqttGateway {
            /**
             * 发送消息
             *
             * @param topic
             * @param data
             */
            void send(@Header(MqttHeaders.TOPIC) String topic, String data);
        }
        

        这样,在发送消息时,直接将消息网关注入,调用发送方法就可以发送了

        mqttGateway.send(topic, JSONObject.toJSONString(msg));
        

        参考:

        https://mqtt.org/


免责声明
1、本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明。
2、本网站转载文章仅为传播更多信息之目的,凡在本网站出现的信息,均仅供参考。本网站将尽力确保所
提供信息的准确性及可靠性,但不保证信息的正确性和完整性,且不对因信息的不正确或遗漏导致的任何
损失或损害承担责任。
3、任何透过本网站网页而链接及得到的资讯、产品及服务,本网站概不负责,亦不负任何法律责任。
4、本网站所刊发、转载的文章,其版权均归原作者所有,如其他媒体、网站或个人从本网下载使用,请在
转载有关文章时务必尊重该文章的著作权,保留本网注明的“稿件来源”,并白负版权等法律责任。

手机扫描二维码访问

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

发表评论

快捷回复: 表情:
评论列表 (暂无评论,7786人围观)

还没有评论,来说两句吧...

目录[+]