SpringBoot集成Mqtt发送消息
1. MQTT简介
MQTT是一种物联网消息协议,为Message Queuing Telemetry Transport的缩写,即消息队列传输探测,协议基于发布订阅模式进行通信,有开销低、带宽小、轻量的特点,通常应用在物联网数据采集、移动应用、智能硬件、电力、能源等领域。
相关概念
三种身份:
- 客户端(Client):MQTT 客户端是发送和接收消息的应用程序。
- 服务器(Broker):也叫“代理”,服务器是处理消息的应用程序,位于发布者和订阅者中间,负责接收消息,并按照某种规则发送给订阅者。
- 主题(Topic): 主题是消息的标识符,用于区分不同类型的消息。
MQTT 消息
MQTT传输的消息可以分为:主题(topic)和负载(payload)两部分
- 主题,可以理解为消息的类型
- 负载,可以理解为消息的内容
消息服务质量QoS(Quality of Service)
Qos用于保证在不同的网络环境下消息传递的可靠性,分为3个等级
- 0 消息最多传递一次,消息发布完全依赖底层TCP/IP网络,可能会发生消息丢失, 也就是发出去就不管了,也被叫做“即发即弃”
- 1 消息传递至少 1 次,确保消息到达,但消息重复可能会发生,发送者将会存储发送的信息直到发送者收到一次来自接收者的PUBACK格式的应答。
- 2 消息仅传送一次,确保消息到达一次
2. SpringBoot集成Mqtt
Spring集成Mqtt常用的有两种方式,一种是直接使用Mqtt的客户端库,如Eclipse Paho,另外一种是spring integration mqtt
第一种:使用Mqtt客户端库
依赖引入:org.eclipse.paho.client.mqttv3
org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.0
服务端配置
public class MqttSendMsgService { private static String clientId = "test"; private static String username = "admin"; private static String password = "xxxxxx"; private static String broker = "tcp://xxxxx:1883"; public ReturnT mqttSend(String param) { MqttClient client; try { client = new MqttClient(broker, clientId, new MemoryPersistence()); client.setCallback(new MqttCallback() { public void connectionLost(Throwable cause) { System.out.println("Connection lost: " + cause.getMessage()); } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { System.out.println("Message arrived: " + mqttMessage.getPayload()); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("Delivery complete"); } }); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName(username); connOpts.setPassword(password.toCharArray()); client.connect(connOpts); log.info("Connected to MQTT Broker!"); //主题 String topic="test/simple"; //消息 String content="发送测试"; MqttMessage message = new MqttMessage(); message.setQos(1); message.setRetained(false); message.setPayload(content.getBytes()); //消息发送 client.publish(topic,message); } catch (MqttException e) { e.printStackTrace(); } return ReturnT.SUCCESS; } }
上面这种使用起来比较简单,生产环境使用最多的还是下面这种
第二种:使用 Spring integration进行集成,这里以发送消息为例
依赖引入
org.springframework.integration spring-integration-mqtt 5.5.14
添加yaml配置
mqtt.url = tcp://xxxxx:1883 mqtt.username = admin mqtt.password = 123456 mqtt.clientId = test mqtt.defaultTopic = /test/send mqtt.keepAliveInterval = 60 mqtt.automaticReconnect = true mqtt.cleanSession = false mqtt.connectionTimeout = 30 mqtt.maxInflight = 1024
添加对应的属性配置类
@Component public class MqttConfigProperties { @Value("${mqtt.url}") private String url; @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.clientId}") private String clientId; @Value("${mqtt.defaultTopic}") private String defaultTopic; @Value("${mqtt.keepAliveInterval}") private Integer keepAliveInterval; @Value("${mqtt.automaticReconnect}") private Boolean automaticReconnect; @Value("${mqtt.cleanSession}") private Boolean cleanSession; @Value("${mqtt.connectionTimeout}") private Integer connectionTimeout; @Value("${mqtt.maxInflight}") private Integer maxInflight; }
创建客户端配置类
@Configuration @IntegrationComponentScan public class MqttConfig { @Autowired private MqttConfigProperties mqttConfigProperties; @Bean public MqttConnectOptions mqttConnectOptions() { log.info("初始化mqtt信息{}", JSON.toJSON(mqttConfigProperties)); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttConfigProperties.getUsername()); options.setPassword(mqttConfigProperties.getPassword().toCharArray()); options.setServerURIs(new String[]{mqttConfigProperties.getUrl()}); options.setKeepAliveInterval(mqttConfigProperties.getKeepAliveInterval()); options.setAutomaticReconnect(mqttConfigProperties.getAutomaticReconnect()); options.setCleanSession(mqttConfigProperties.getCleanSession()); options.setConnectionTimeout(mqttConfigProperties.getConnectionTimeout()); options.setMaxInflight(mqttConfigProperties.getMaxInflight()); return options; } @Bean public MqttPahoClientFactory mqttPahoClientFactory(MqttConnectOptions mqttConnectOptions) { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(mqttConnectOptions); return factory; } // 推送通道 @Bean public MessageChannel mqttOutputChannel() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = "mqttOutputChannel") public MessageHandler sendHandler(MqttPahoClientFactory mqttPahoClientFactory) { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfigProperties.getClientId() + "-publish", mqttPahoClientFactory); messageHandler.setAsync(true); messageHandler.setDefaultQos(1); messageHandler.setDefaultTopic(mqttConfigProperties.getDefaultTopic()); log.info("初始化mqttOutputChannel..."); return messageHandler; } }
发送网关接口
@MessagingGateway(defaultRequestChannel = "mqttOutputChannel") public interface MqttGateway { /** * 发送消息 * * @param topic * @param data */ void send(@Header(MqttHeaders.TOPIC) String topic, String data); }
这样,在发送消息时,直接将消息网关注入,调用发送方法就可以发送了
mqttGateway.send(topic, JSONObject.toJSONString(msg));
参考:
https://mqtt.org/
文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。
还没有评论,来说两句吧...