zoukankan      html  css  js  c++  java
  • 跟踪mqttv3源码(一)

    Spring整合MQTT

    pom.xml

    <!-- MQTT消息队列 -->
            <dependency> 
                <groupId>org.eclipse.paho</groupId> 
                 <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.0.2</version> 
            </dependency>
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
                <version>2.8.1</version>
            </dependency>
            <!-- 消息推送
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>4.1.0</version>
            </dependency>
             -->
            <dependency>  
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-mqtt</artifactId>
                <version>4.3.5.RELEASE</version>
                <exclusions>  
                    <exclusion>  
                        <groupId>org.eclipse.paho</groupId>  
                        <artifactId>mqtt-client</artifactId>  
                    </exclusion>  
                    <exclusion>  
                        <groupId>org.springframework</groupId>  
                        <artifactId>spring-messaging</artifactId>  
                    </exclusion>  
                </exclusions>  
            </dependency>  
            <!--
            <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-core</artifactId>
                <version>5.0.0.M5</version>
            </dependency>
             -->
            
            <dependency>  
                <groupId>org.fusesource.mqtt-client</groupId>  
                <artifactId>mqtt-client</artifactId>  
                <version>1.14</version>
            </dependency>

    spring-mqtt.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:int="http://www.springframework.org/schema/integration"
        xmlns:mqtt="http://www.springframework.org/schema/integration/mqtt"
        xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
        xmlns:context="http://www.springframework.org/schema/context"
        xsi:schemaLocation="http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.3.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.3.xsd">
        
        <!-- 引入配置文件:classpath等同于src目录,两种配置方式 -->
        <context:property-placeholder location="classpath:mqtt.properties"  ignore-unresolvable="true" />
        
        <!-- mqtt客户端订阅消息 -->
        <bean id="clientFactory" class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
            <property name="userName" value="${broker.userName}"/>
            <property name="password" value="${broker.password}"/>
        </bean>
    
        <!-- 
        消息适配器 org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter 
        org.springframework.messaging.MessageChannel
        org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler
        DefaultPahoMessageConverter
        -->
        <!-- 消息转换器
        <bean id="myConverter" class="org.springframework.integration.mqtt.support.DefaultPahoMessageConverter"></bean>
         -->
         
        <int-mqtt:message-driven-channel-adapter
            id="mqttInbound" 
            client-id="client" 
            url="${broker.host}"
            topics="activate"
            qos="1" 
            client-factory="clientFactory"
            auto-startup="true" 
            send-timeout="12" 
            recovery-interval="10000"
            channel="startCase" />
        
        <!-- 
        <int:channel id="startCase" />
         -->
        
        <!-- 对接收消息进行过滤 @tstamp + ' ' + headers.get('mqtt_topic') + ': ' + payload.toString() + @newline
        <int:transformer id="convert"
            input-channel="startCase"
            expression="payload.toString() + headers.get('mqtt_topic')"
            output-channel="toMqttService" />
         -->
             
        <!-- 方案一 -->
        <int:service-activator id="startCaseService"
            input-channel="startCase" ref="mqttCaseService" method="startCase" />
        
        <!-- 方案二   id="toMqttService" channel="toMqttService"
        <int:outbound-channel-adapter id="toMqttService"  
            ref="mqttCaseService" 
            method="startCase" />
        -->
        
        <bean id="mqttCaseService" class="com.vguang.service.impl.MqttService2"></bean>
        
    </beans>

    实现消息处理类

    public class MqttService2{
        private static final Logger log = LoggerFactory.getLogger(MqttService2.class);
    
        private MqttPahoMessageHandler mqtt;
        private volatile Integer serialno = 0;
        
        public void startCase(Message<String> recmsg){
                    //mqtt5.0
    //        String topic = (String) recmsg.getHeaders().get("mqtt_receivedTopic");
            String topic = (String) recmsg.getHeaders().get("mqtt_topic");
            String payload = recmsg.getPayload();
    
            log.info("消息解析headers结果:{},{}", topic, payload);
        }
    }

    startCase()方法中的参数目前我知道的有三种:

  • 相关阅读:
    Selenium webdriver 操作日历控件
    selenuim-webdriver注解之@FindBy、@FindBys、@FindAll的区别
    配置 mybatis的 log4j.properties
    查询在一个数据库中某个字段存在于哪些表
    Linux下修改Mysql的用户(root)的密码
    MySQL——修改root密码的4种方法(以windows为例)
    报错:1130-host ... is not allowed to connect to this MySql server 开放mysql远程连接 不使用localhost
    C++中的static 成员变量的一些注意点
    #pragma once与#ifndef的区别
    C++类中的成员函数和构造函数为模板函数时的调用方法
  • 原文地址:https://www.cnblogs.com/wangwanchao/p/7516733.html
Copyright © 2011-2022 走看看