视频说明:https://www.bilibili.com/video/BV1qf4y1n7js/
关于MQTT
做一个无人船项目,使用MQTT通信。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。
发布订阅
服务端
服务端使用 mosquitto
下载页面:https://mosquitto.org/download/
客户端
MQTTX
下载页面:https://mqttx.app/#download
MQTT.fx
下载链接:http://www.jensd.de/apps/mqttfx/1.7.1/mqttfx-1.7.1-windows-x64.exe
paho
https://github.com/eclipse/paho.mqtt.java
paho是eclipse提供MQTT客户端开源库,Java代码集成这个客户端用来收发消息。
代码
依赖
pom.xml
<!-- MQTT -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
类MqttConfig
spring中集成框架,有消息入站通道(用来接收消息)和出站通道(用来发送消息)
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import java.util.Date;
@Configuration
public class MqttConfig {
// 消费消息
/**
* 创建MqttPahoClientFactory,设置MQTT Broker连接属性,如果使用SSL验证,也在这里设置。
* @return
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{"tcp://127.0.0.1:1883"});
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("consumerClient-paho",
mqttClientFactory(), "boat", "collector", "battery", "+/sensor");
adapter.setCompletionTimeout(5000);
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
defaultPahoMessageConverter.setPayloadAsBytes(true);
adapter.setConverter(defaultPahoMessageConverter);
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
// ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
String payload = message.getPayload().toString();
// byte[] bytes = (byte[]) message.getPayload(); // 收到的消息是字节格式
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
// 根据topic分别进行消息处理。
if (topic.matches(".+/sensor")) { // 匹配:1/sensor
String sensorSn = topic.split("/")[0];
System.out.println("传感器" + sensorSn + ": 的消息: " + payload);
} else if (topic.equals("collector")) {
System.out.println("采集器的消息:" + payload);
} else {
System.out.println("丢弃消息:主题【" + topic + "],负载:" + payload);
}
};
}
// 发送消息
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler outbound() {
// 在这里进行mqttOutboundChannel的相关设置
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publishClient", mqttClientFactory());
messageHandler.setAsync(true); //如果设置成true,即异步,发送消息时将不会阻塞。
messageHandler.setDefaultTopic("command");
messageHandler.setDefaultQos(2);
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
// defaultPahoMessageConverter.setPayloadAsBytes(true); // 发送默认按字节类型发送消息
messageHandler.setConverter(defaultPahoMessageConverter);
return messageHandler;
}
}
接口MqttGateway
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
// 定义重载方法,用于消息发送
void sendToMqtt(String payload);
// 指定topic进行消息发送
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}
测试
测试方式:使用接口工具,给接口发送消息,从而调用MQTT客户端发布消息
类MqttController
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class MqttController {
@Resource
private MqttGateway mqttGateway;
@RequestMapping("/send/{topic}/{message}")
public String send(@PathVariable String topic, @PathVariable String message) {
// 发送消息到指定topic
mqttGateway.sendToMqtt(topic, 1, message);
return "send message : " + message;
}
}