zoukankan      html  css  js  c++  java
  • SpringBoot2.x集成MQTT实现消息订阅

    1.引入相关的依赖

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    </dependency>

    2. 在配置文件下配置MQTT服务器信息

    spring.mqtt.username = username
    spring.mqtt.password = password
    spring.mqtt.url
    = tcp://xx.xx.xx.xx:18083
    spring.mqtt.client.id = clientid
    spring.mqtt.
    default.topic = topic
    spring.mqtt.
    default.completionTimeout = 3000

    3.配置MQTT消息推送配置

    /**
     * MQTT配置
     * @Author: songyaru
     * @Date: 2020/9/01 10:04
     * @Version 1.0
     */
    
    @Slf4j
    @Configuration
    @IntegrationComponentScan
    public class MqttReceiveConfig {
    
        @Value("${spring.mqtt.username}")
        private String username;
    
        @Value("${spring.mqtt.password}")
        private String password;
    
        @Value("${spring.mqtt.url}")
        private String hostUrl;
    
        @Value("${spring.mqtt.client.id}")
        private String clientId;
    
        @Value("${spring.mqtt.default.topic}")
        private String defaultTopic;
    
        @Value("${spring.mqtt.default.completionTimeout}")
        private int completionTimeout;//连接超时
    
        //初始化连接
        @Bean
        public MqttConnectOptions getMqttConnectOptions() {
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setUserName(username);
            mqttConnectOptions.setPassword(password.toCharArray());
            mqttConnectOptions.setServerURIs(new String[]{hostUrl});
            mqttConnectOptions.setKeepAliveInterval(50);
            return mqttConnectOptions;
        }
    
        //初始化mqtt工厂
        @Bean
        public MqttPahoClientFactory mqttClientFactory() {
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            factory.setConnectionOptions(getMqttConnectOptions());
            return factory;
        }
    
        //接收通道
        @Primary
        @Bean("mqttInputChannel")
        public MessageChannel mqttInputChannel() {
            return new DirectChannel();
        }
    
        //配置client,监听的topic
        @Bean
        public MessageProducer inbound(@Qualifier("mqttInputChannel") MessageChannel messageChannel) {
            MqttPahoMessageDrivenChannelAdapter adapter =
                    new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", mqttClientFactory(),defaultTopic); //这里的defaultTopic是发布者的主题               
            adapter.setCompletionTimeout(completionTimeout);
            adapter.setConverter(new DefaultPahoMessageConverter());
            adapter.setQos(1);
            adapter.setOutputChannel(messageChannel);
            return adapter;
        }
    
    
       //订阅消费数据,通过通道获取数据
        @Bean
        @ServiceActivator(inputChannel = "mqttInputChannel")
        public MessageHandler handler() {
            return new MessageHandler() {
                @Override
                public void handleMessage(Message<?> message) throws MessagingException {
                    log.info("主题:{},消息接收到的数据:{}", message.getHeaders().get("mqtt_receivedTopic"), message.getPayload());
                }
            };
        }
    
    }

    4.启动服务,使用上一篇博文的消息接口发送消息。

  • 相关阅读:
    Android AdapterView View的复用机制 分析
    go12---interface
    go11---方法method
    go10---struct
    go09---defer
    go8---函数function
    go7---map
    go6---slice切片
    go5--数组
    go4--break,continue + 标签
  • 原文地址:https://www.cnblogs.com/songyaru/p/13596660.html
Copyright © 2011-2022 走看看