zoukankan      html  css  js  c++  java
  • mqtt 集成

    -- 在pom.xml导入依赖

    <!-- mqtt -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-integration</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-stream</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-mqtt</artifactId>
            </dependency>
    

      

    -- 在application.yml添加配置

    spring:
    #MQTT配置信息
      mqtt:
        username: bywmqtt
        password: RuizxZWrqNBmgk1h7yd4
        #MQTT-服务器连接地址,如果有多个,用逗号隔开 tcp://39.108.67.63:1883
        url: tcp://39.108.67.63:1883
        client:
          id: test001
        default:
          topic: topicTest01
        completionTimeout: 3000
    

    -- 配置消息通道,连接,监听主题 

    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.IntegrationComponentScan;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.core.MessageProducer;
    import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
    import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
    import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
    import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
    import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.MessageHandler;
    import org.springframework.messaging.MessagingException;
    
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * mqtt配置
     */
    @Configuration
    @IntegrationComponentScan
    @Slf4j
    public class MqttServerConfig {
    	
    	public static final String OUT_BOUND_CHANNEL = "mqttOutboundChannel";
    	public static final String INPUT_CHANNEL = "mqttInputChannel";
    	public static final String RECEIVE_TOPIC = "mqtt_receivedTopic";
    	public static final String TOPIC_1 = "TOPIC_1";
    	public static final String TOPIC_2 = "TOPIC_2";
    	public static final String[] SUB_TOPIC = {TOPIC_1, TOPIC_2};
    	
        @Value("${spring.mqtt.username:}")
        private String username;
        @Value("${spring.mqtt.password:}")
        private String password;
        @Value("${spring.mqtt.url:}")
        private String hostUrl;
        @Value("${spring.mqtt.client.id:}")
        private String serviceId;
        @Value("${spring.mqtt.default.topic:}")
        private String defaultTopic;
        @Value("${spring.mqtt.completionTimeout:}")
        private int completionTimeout ;   //连接超时
    
        @Bean
        public MqttConnectOptions getMqttConnectOptions() {
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setUserName(username);
            mqttConnectOptions.setPassword(password.toCharArray());
            mqttConnectOptions.setServerURIs(new String[]{hostUrl});
            //心跳
            mqttConnectOptions.setKeepAliveInterval(20);
            //连接超时
            mqttConnectOptions.setConnectionTimeout(30);
            return mqttConnectOptions;
        }
    
        @Bean
        public MqttPahoClientFactory mqttClientFactory() {
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            factory.setConnectionOptions(getMqttConnectOptions());
            return factory;
        }
    
        @Bean
        @ServiceActivator(inputChannel = OUT_BOUND_CHANNEL)
        public MessageHandler mqttOutbound() {
            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(serviceId, mqttClientFactory());
            messageHandler.setAsync(true);
            messageHandler.setDefaultTopic(defaultTopic);
            //消息是否永久保留
            messageHandler.setDefaultRetained(true);
            //消息发布服务质量
            messageHandler.setDefaultQos(1);
            return messageHandler;
        }
    
        @Bean
        public MessageChannel mqttOutboundChannel() {
            return new DirectChannel();
        }
        
        //接收通道
        @Bean
        public MessageChannel mqttInputChannel() {
            return new DirectChannel();
        }
        
        //配置client,监听的topic 
        @Bean
        public MessageProducer inbound() {
            MqttPahoMessageDrivenChannelAdapter adapter =
                    new MqttPahoMessageDrivenChannelAdapter(serviceId+"_inbound", mqttClientFactory(), SUB_TOPIC);
            adapter.setCompletionTimeout(completionTimeout);
            adapter.setConverter(new DefaultPahoMessageConverter());
            adapter.setQos(1);
            adapter.setOutputChannel(mqttInputChannel());
            return adapter;
        }
        
        //通过通道获取数据
        @Bean
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public MessageHandler handler() {
            return new MessageHandler() {
                @Override
                public void handleMessage(Message<?> message) throws MessagingException {
                    String topic = message.getHeaders().get(RECEIVE_TOPIC).toString();
                    log.info("[{}]主题接收到消息:{}", topic, message.getPayload().toString());
                }
            };
        }
        
    }
    

    -- 发送消息

    @Autowired
    	private MqttGateway mqttGateway;
    	
    	@RequestMapping("/sendMqtt")
    	public ReturnResult sendMqtt(String topic, String  sendData){
    		// 默认主题发送
    		mqttGateway.sendToMqtt(sendData);
    		// 指定主题推送
    		mqttGateway.sendToMqtt(topic, sendData);
    		return ResultBuild.success();
    	}
    

      

  • 相关阅读:
    英语语法学习笔记之名词
    2016年回顾2017年目标之流水账
    英语单词词性
    本机tomcat的server.xml被还原的问题及解决办法
    关闭英文拼写检查,关闭xml验证
    eclipse运行速度优化(解决狂读盘、发布慢、CPU100%等问题)
    mysql中,通过脚本设置表的自增列,及自增步长
    Eclipse调试 : step into,step over,step return 说明
    几个分布方法及距离方法
    国内其他的maven库
  • 原文地址:https://www.cnblogs.com/yun965861480/p/11910439.html
Copyright © 2011-2022 走看看