zoukankan      html  css  js  c++  java
  • Spring Boot 集成 MQTT

    本文代码有些许问题,处理方案见:解决 spring-integration-mqtt 频繁报 Lost connection 错误

    一、添加配置

    spring:
      mqtt:
        client:
          username: 用户名
          password: 密码
          serverURIs: tcp://ip:port # 客户端地址,多个使用逗号隔开
          clientId: client0001 # ${random.value}
          keepAliveInterval: 30
          connectionTimeout: 30
        producer:
          defaultQos: 1
          defaultRetained: true
          defaultTopic: defaultTopicName
        consumer:
          defaultQos: 1
          completionTimeout: 30000
          consumerTopics: topic1,topic2 # 监听的 topic,多个使用逗号隔开
    

    二、客户端配置

        /* 客户端 */
        @Bean
        public MqttConnectOptions getMqttConnectOptions() {
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setUserName(username);
            mqttConnectOptions.setPassword(password.toCharArray());
            mqttConnectOptions.setServerURIs(serverURIs);
            mqttConnectOptions.setKeepAliveInterval(keepAliveInterval);
            mqttConnectOptions.setConnectionTimeout(connectionTimeout);
    
            return mqttConnectOptions;
        }
    
        @Bean
        public MqttPahoClientFactory getMqttClientFactory() {
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            factory.setConnectionOptions(getMqttConnectOptions());
    
            return factory;
        }
    

    三、发布消息

    3.1 配置

        @Bean
        public MessageChannel outboundChannel() {
            return new DirectChannel();
        }
    
        @Bean
        @ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
        public MessageHandler getMqttProducer() {
            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, getMqttClientFactory());
            messageHandler.setAsync(true);
            messageHandler.setDefaultTopic(defaultTopic);
            messageHandler.setDefaultRetained(defaultRetained);
            messageHandler.setDefaultQos(defaultProducerQos);
    
            return messageHandler;
        }
    

    3.2 消息推送接口类

    @MessagingGateway(defaultRequestChannel = MqttConfig.OUTBOUND_CHANNEL)
    public interface MqttSender {
    
        void sendToMqtt(String data);
    
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
    
        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
    }
    

    3.3 测试

    @RestController
    public class TestController {
    
        @Autowired
        private MqttSender mqttSender;
    
        @RequestMapping("/send")
        private void send(String data){
            mqttSender.sendToMqtt(data);
        }
    }
    

    四、订阅消息

    4.1 配置

        @Bean
        public MessageChannel inboundChannel() {
            return new DirectChannel();
        }
    
        @Bean
        public MessageProducer getMqttConsumer() {
            MqttPahoMessageDrivenChannelAdapter adapter =
                    new MqttPahoMessageDrivenChannelAdapter(clientId, getMqttClientFactory(), consumerTopics);
            adapter.setCompletionTimeout(completionTimeout);
            adapter.setConverter(new DefaultPahoMessageConverter());
            adapter.setQos(defaultConsumerQos);
            adapter.setOutputChannel(inboundChannel());
    
            return adapter;
        }
    

    4.2 测试

    @Component
    public class MqttConsumer {
        private static final Logger LOGGER = LoggerFactory.getLogger(MqttConsumer.class);
    
        @Bean
        @ServiceActivator(inputChannel = MqttConfig.INBOUND_CHANNEL)
        public MessageHandler handler() {
            return message -> {
                String topic = message.getHeaders().get(MqttConfig.RECEIVED_TOPIC_KEY).toString();
                LOGGER.info("[{}]主题接收到消息:{}", topic, message.getPayload().toString());
            };
        }
    }
    

    注意事项

    1. @ServiceActivator 和 @MessagingGateway 中绑定的 Channel 名,需与返回 MessageChannel 的 Bean 的方法名一样:

      如发布者绑定的 Channel 名为 outboundChannel,则需要有对应的方法,如下:

       @Bean
       public MessageChannel outboundChannel() {
           return new DirectChannel();
       }
      
    2. 发布者与订阅者的 Channel 名不能相同
    3. 连接服务器的超时时间和订阅的超时时间单位不一样

    参考

    1. MQTT系列教程1(基本概念介绍)
    2. SpringBoot - 集成MQTT教程1(发布消息)
    3. SpringBoot - 集成MQTT教程2(订阅消息)

    完整代码:GitHub

  • 相关阅读:
    《构建之法》阅读笔记--2
    学习进度条--第九周
    团队冲刺第十天
    团队冲刺第十一天
    团队冲刺第九天
    团队冲刺第八天(4/29)
    团队冲刺第七天(4/28)
    团队冲刺第六天( 2016/4/27)
    第九章动手动脑
    第八章多态动手动脑
  • 原文地址:https://www.cnblogs.com/victorbu/p/11978107.html
Copyright © 2011-2022 走看看