MQTT 协议基本介绍 较详细,含EMQ搭建服务器
使用Eclipse创建Maven项目参考:使用eclipse创建Spring Boot项目
Spring提供了对多种消息中间件的整合,其中也包括MQTT。具体请参见以下链接:
https://docs.spring.io/spring-integration/reference/html/
Spring整合MQTT步骤如下:
1、创建Spring Boot Maven工程,poxm.xml引入如下依赖:
<!-- mqtt --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
2、application.properties文件中增加MQTT配置参数
#MQTT Config com.mqtt.url=tcp://mqttServerhost:1883 com.mqtt.inboundclientid=in_clientid com.mqtt.outboundclientid=out_clientid com.mqtt.topics=+/V1/T/+/topic
3、增加MQTT配置类
import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component;import org.springframework.integration.mqtt.support.MqttHeaders; @Configuration public class MqttConfiguration { @Value("${com.mqtt.url}") private String url; @Value("${com.mqtt.topics}") private String topics; @Value("${com.mqtt.inboundclientid}") private String inclientid; @Value("${com.mqtt.outboundclientid}") private String outclientid; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setServerURIs(url); return factory; } /** InBound Begin 消息接收端 ****/ @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { String clientid = inclientid + "_" + System.currentTimeMillis(); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(url, clientid); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); adapter.addTopic(topics); return adapter; } // ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息信息的channel。 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MqttInboundMessageHandler(); // return new MessageHandler() { // // 消息消费 // @Override // public void handleMessage(Message<?> message) throws MessagingException { // String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); // System.out.println(topic + ":收到消息 " + message.getPayload().toString()); // } // }; } /** InBound End ****/ /** OutBound Begin 消息发送端 ****/ @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } /***** * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory * * @return */ @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { // 在这里进行mqttOutboundChannel的相关设置 MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(outclientid, mqttClientFactory()); messageHandler.setAsync(true); // 如果设置成true,发送消息时将不会阻塞。 messageHandler.setDefaultTopic("testTopic"); return messageHandler; } @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MyGateway { // 定义重载方法,用于消息发送 void sendToMqtt(String data); // 指定topic进行消息发送 void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); } /** OutBound End ****/ }
4、创建类MqttInboundMessageHandler实现MessageHandler用于处理MQTT消息
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; public class MqttInboundMessageHandler implements MessageHandler { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public void handleMessage(Message<?> message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); //String topic = message.getHeaders().get("mqtt_topic").toString(); //低版本使用 mqtt_topic String payload = message.getPayload().toString(); //mqtt消息写入日志 StringBuilder sb = new StringBuilder(); sb.append(" ").append("topic:").append(topic).append(" ").append("payload:").append(payload); logger.info(sb.toString()); } }
日志配置文件为resources下的logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration scan="true" scanPeriod="10 seconds"> <contextName>logback</contextName> <property name="log.path" value="D:/logs/mqttlogs" /> <conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" /> <conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" /> <conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" /> <property name="CONSOLE_LOG_PATTERN" value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>info</level> </filter> <encoder> <Pattern>${CONSOLE_LOG_PATTERN}</Pattern> <charset>UTF-8</charset> </encoder> </appender> <appender name="DEBUG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> <charset>UTF-8</charset> </encoder> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${log.path}/debug/uiot-mqtt-%d{yyyy-MM-dd}.%i.log</fileNamePattern> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>100MB</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> <maxHistory>15</maxHistory> </rollingPolicy> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level>debug</level> <onMatch>ACCEPT</onMatch> <onMismatch>DENY</onMismatch> </filter> </appender> <appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> <charset>UTF-8</charset> </encoder> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${log.path}/info/uiot-mqtt-%d{yyyy-MM-dd}.%i.log</fileNamePattern> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>100MB</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> <maxHistory>15</maxHistory> </rollingPolicy> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level>info</level> <onMatch>ACCEPT</onMatch> <onMismatch>DENY</onMismatch> </filter> </appender> <appender name="WARN_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> <charset>UTF-8</charset> </encoder> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${log.path}/warn/uiot-mqtt-%d{yyyy-MM-dd}.%i.log</fileNamePattern> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>100MB</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> <maxHistory>15</maxHistory> </rollingPolicy> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level>warn</level> <onMatch>ACCEPT</onMatch> <onMismatch>DENY</onMismatch> </filter> </appender> <appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> <charset>UTF-8</charset> </encoder> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${log.path}/error/uiot-mqtt-%d{yyyy-MM-dd}.%i.log</fileNamePattern> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>100MB</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> <maxHistory>15</maxHistory> </rollingPolicy> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level>ERROR</level> <onMatch>ACCEPT</onMatch> <onMismatch>DENY</onMismatch> </filter> </appender> <root level="info"> <appender-ref ref="CONSOLE" /> <appender-ref ref="DEBUG_FILE" /> <appender-ref ref="INFO_FILE" /> <appender-ref ref="WARN_FILE" /> <appender-ref ref="ERROR_FILE" /> </root> </configuration>
5、创建RestController,提供通过http发送MQTT消息的API
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/mqtt/send") public class MqttController { @Autowired private MyGateway mqttGateway; @RequestMapping(value="/sendMessage", method=RequestMethod.POST) public String send(@RequestParam String topic, @RequestParam String message) { mqttGateway.sendToMqtt(message, topic); return topic + "send message : " + message; } @RequestMapping(value="/getTest", method=RequestMethod.GET) public String getTest() { return "success"; } @RequestMapping(value="/postTest", method=RequestMethod.POST) public String getTest(String data) { return "success,data=" + data; } }
6、打包方式改为war包,发布到Tomcat。入口App扩展 SpringBootServletInitializer
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; @SpringBootApplication public class App extends SpringBootServletInitializer { public static void main(String[] args) { SpringApplication.run(App.class, args); } @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) { return builder.sources(App.class); } }
打包和发布完整pom.xml文件如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.demo</groupId> <artifactId>utiot-mqtt</artifactId> <version>1.0.0.200311</version> <packaging>war</packaging> <name>utiot-mqtt</name> <!-- <url>http://maven.apache.org</url> --> <repositories> <repository> <id>nexus</id> <name>Team Nexus Repository</name> <url>http://192.168.80.194:8081/nexus/content/groups/public</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>nexus</id> <name>Team Nexus Repository</name> <url>http://192.168.80.194:8081/nexus/content/groups/public</url> </pluginRepository> </pluginRepositories> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.6.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <java.version>1.8</java.version> <encoding>UTF-8</encoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- mqtt --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> </dependencies> <distributionManagement> <repository> <id>nexus-releases</id> <name>nexus releases</name> <url>http://192.168.80.194:8081/nexus/content/repositories/releases/</url> </repository> <snapshotRepository> <id>nexus-snapshots</id> <name>nexus snapshots</name> <url>http://192.168.80.194:8081/nexus/content/repositories/snapshots/</url> </snapshotRepository> </distributionManagement> <build> <!-- <finalName>mqtt</finalName> --> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>${java.version}</source> <target>${java.version}</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> <skipTests>true</skipTests> </configuration> </plugin> </plugins> </build> </project>
打包方法:右键项目,选择“Show In”—“Terminal”,输入mvn deploy(发布到服务器),或 mvn install (本地打包)
其他参考资料:
Spring 整合MQTT 订阅和推送、代码示例
springboot整合mqtt 连接配置(含用户名、密码)
Spring整合MQTT 参数说明
SpringBoot2.0集成MQTT功能之消息订阅处理 参数配置
物联网架构成长之路(32)-SpringBoot集成MQTT客户端
MQTT服务器
客户端模拟工具: