1.引入相关的依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
2.在配置文件中配置MQTT服务器信息
spring.mqtt.username = username spring.mqtt.password = password
spring.mqtt.url = tcp://xx.xx.xx.xx:18083
spring.mqtt.client.id = clientid
spring.mqtt.default.topic = topic
spring.mqtt.default.completionTimeout = 3000
3.配置MQTT消息推送配置
/** * @Author: songyaru * @Date: 2020/9/1 13:42 * @Version 1.0 */ @Configuration @IntegrationComponentScan public class MqttSenderConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client.id}") private String clientId; @Value("${spring.mqtt.default.topic}") private String defaultTopic; @Value("${spring.mqtt.default.completionTimeout}") private int completionTimeout; @Bean public MqttConnectOptions getMqttConnectOptions() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setConnectionTimeout(10); mqttConnectOptions.setKeepAliveInterval(90); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); mqttConnectOptions.setKeepAliveInterval(2); return mqttConnectOptions; } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(defaultTopic); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } }
4.MQTT消息推送接口
/** * @Author: songyaru * @Date: 2020/9/1 13:51 * @Version 1.0 */ @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic); }
5.MQTT消息推送API
/** * @Author: songyaru * @Date: 2020/9/1 13:52 * @Version 1.0 */ @RestController public class MessageController { @Autowired MqttGateway mqttGateway; /*** * 发布消息,用于其他客户端消息接收测试 */ @RequestMapping("/sendMqttMessage") public String sendMqttMessage(String message, String topic) { mqttGateway.sendToMqtt(message, topic); return "ok"; } }
6、测试
在POSTMAN中进行测试了,输入消息内容和主题,就可以在相应的频道发送消息了。使用其它的消息客户端进行测试,可以接受到消息。