最近这一年里,在项目实战的时候,遇到了mqtt开发,今天我就大致的来总结下,mqtt在spring boot的使用
1、引用jar
<!-- mqtt --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
2.项目启动建立链接
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import com.slife.cws.mqtt.component.MqttPushClient; import com.slife.cws.mqtt.config.MqttConfig; import lombok.extern.slf4j.Slf4j; @Component @Slf4j public class MqttApplicationRunner implements ApplicationRunner { @Autowired private MqttConfig mqttConfig; @Override public void run(ApplicationArguments args) throws Exception { if (log.isInfoEnabled()) { log.info("===============>>>Mqtt is run starting:<<=================="); } MqttPushClient mqttPushClient = new MqttPushClient(); mqttPushClient.connect(mqttConfig); // 订阅主题 mqttPushClient.subscribe(mqttConfig.getTopic(), mqttConfig.getQos()); } }
3.相关配置及实现类
①、配置
#spring.mqtt.url=tcp://127.0.0.1 spring.mqtt.url=tcp://127.0.0.1 spring.mqtt.username= nbew spring.mqtt.password= 123456 spring.mqtt.client-id= 100201101 spring.mqtt.topics= top spring.mqtt.completion-timeout= 3000 spring.mqtt.timeout= 120 spring.mqtt.keep-alive= 20 spring.mqtt.qos= 1,1 spring: mqtt: url: tcp://mqtt.rootcloudapp.com:1883 username: 0ba851e2e83609b9 password: 81757448a4df0d73 client-id: 0ba851e2e83609b9 id: test10000011 topics: v4/p/post/thing/live/json/1.1 completion-timeout: 3000 timeout: 30 keep-alive: 60 qos: 1
import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import lombok.Data; //@ConfigurationProperties(prefix = "spring.mqtt") @Data @Component @Configuration public class MqttConfig { /** * 链接url */ @Value("${spring.mqtt.url}") private String url; /** * 用户名 */ @Value("${spring.mqtt.username}") private String username; /** * 密码 */ @Value("${spring.mqtt.password}") private String password; /** * 客户端id */ @Value("${spring.mqtt.client-id}") private String clientId; /** * 通讯标识 id */ @Value("${spring.mqtt.id}") private String id; /** * 主题 */ @Value("${spring.mqtt.topics}") private String[] topic; /** * 超时时间 */ @Value("${spring.mqtt.timeout}") private int timeout; /** * 心跳检测时间 */ @Value("${spring.mqtt.keep-alive}") private int keepAlive; /** * 心跳包级别 */ @Value("${spring.mqtt.qos}") private int[] qos; private int completionTimeout; }
import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; /** * @Package com.slife.cws.mqtt.component * @ClassName: Mqttbean * @Description: 客户端 * @Author youli * @date 2021年2月16日 * @CopyRight:上海成生科技有限公司 */ @Component public class Mqttbean { @Bean("mqttPushClient") public MqttPushClient getMqttPushClient() { MqttPushClient mqttPushClient = new MqttPushClient(); return mqttPushClient; } }
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import com.slife.cws.mqtt.config.MqttConfig; import lombok.extern.slf4j.Slf4j; /** * @Package com.shhw.mqtt.component * @ClassName: MqttPushClient * @Description: MqttClient客户端代码 * @Author youli * @date 2020年10月16日 * @CopyRight:上海成生科技有限公司 */ @Slf4j public class MqttPushClient { private static MqttClient client; public static MqttClient getClient() { return client; } public static void setClient(MqttClient client) { MqttPushClient.client = client; } private MqttConnectOptions getOption(String userName, String password, int outTime, int KeepAlive) { // MQTT连接设置 MqttConnectOptions option = new MqttConnectOptions(); // 设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接 option.setCleanSession(false); // 设置连接的用户名 option.setUserName(userName); // 设置连接的密码 option.setPassword(password.toCharArray()); // 设置超时时间 单位为秒 option.setConnectionTimeout(outTime); // 设置会话心跳时间 单位为秒 服务器会每隔(1.5*keepTime)秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 option.setKeepAliveInterval(KeepAlive); // setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 // option.setWill(topic, "close".getBytes(), 2, true); option.setMaxInflight(1000); log.info("================>>>MQTT连接认证成功<<======================"); return option; } /** * 连接 */ public void connect(MqttConfig mqttConfig) { MqttClient client; try { String clientId = mqttConfig.getClientId(); clientId += System.currentTimeMillis(); client = new MqttClient(mqttConfig.getUrl(), clientId, new MemoryPersistence()); MqttConnectOptions options = getOption(mqttConfig.getUsername(), mqttConfig.getPassword(), mqttConfig.getTimeout(), mqttConfig.getKeepAlive()); MqttPushClient.setClient(client); try { client.setCallback(new PushCallback<Object>(this, mqttConfig)); if (!client.isConnected()) { client.connect(options); log.info("================>>>MQTT连接成功<<======================"); //订阅主题 subscribe(mqttConfig.getTopic(), mqttConfig.getQos()); } else {// 这里的逻辑是如果连接不成功就重新连接 client.disconnect(); client.connect(options); log.info("===================>>>MQTT断连成功<<<======================"); } } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } /** * 断线重连 * * @throws Exception */ public Boolean reConnect() throws Exception { Boolean isConnected = false; if (null != client) { client.connect(); if (client.isConnected()) { isConnected = true; } } return isConnected; } /** * 发布,默认qos为0,非持久化 * * @param topic * @param pushMessage */ public void publish(String topic, String pushMessage) { publish(0, false, topic, pushMessage); } /** * 发布 * * @param qos * @param retained * @param topic * @param pushMessage */ public void publish(int qos, boolean retained, String topic, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic); if (null == mTopic) { log.error("===============>>>MQTT topic 不存在<<======================="); } MqttDeliveryToken token; try { token = mTopic.publish(message); token.waitForCompletion(); } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } /** * 发布消息的服务质量(推荐为:2-确保消息到达一次。0-至多一次到达;1-至少一次到达,可能重复), retained * 默认:false-非持久化(是指一条消息消费完,就会被删除;持久化,消费完,还会保存在服务器中,当新的订阅者出现,继续给新订阅者消费) * * @param topic * @param pushMessage */ public void publish(int qos, String topic, String pushMessage) { publish(qos, false, topic, pushMessage); } /** * 订阅某个主题,qos默认为0 * * @param topic */ public void subscribe(String[] topic) { subscribe(topic, null); } /** * 订阅某个主题 * * @param topic * @param qos */ public void subscribe(String[] topic, int[] qos) { try { MqttPushClient.getClient().unsubscribe(topic); MqttPushClient.getClient().subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } }
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import lombok.extern.slf4j.Slf4j; /** * @Package com.shhw.mqtt.component * @ClassName: MqttSender * @Description: 主题发布 * @Author youli * @date 2020年10月16日 * @CopyRight:上海成生科技有限公司 */ @Component(value = "mqttSender") @Slf4j public class MqttSender { @Async public void send(String queueName, String msg) { log.debug("=====================>>>>发送主题:{}, msg:{}", queueName,msg); publish(2, queueName, msg); } /** * 发布,默认qos为0,非持久化 * * @param topic * @param pushMessage */ public void publish(String topic, String pushMessage) { publish(1, false, topic, pushMessage); } /** * 发布 * * @param qos * @param retained * @param topic * @param pushMessage */ public void publish(int qos, boolean retained, String topic, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic); if (null == mTopic) { log.error("===================>>>MQTT topic 不存在<<================="); } MqttDeliveryToken token; try { token = mTopic.publish(message); token.waitForCompletion(); } catch (MqttPersistenceException e) { log.error("============>>>publish fail", e); e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } /** * 发布消息的服务质量(推荐为:2-确保消息到达一次。0-至多一次到达;1-至少一次到达,可能重复), retained * 默认:false-非持久化(是指一条消息消费完,就会被删除;持久化,消费完,还会保存在服务器中,当新的订阅者出现,继续给新订阅者消费) * * @param topic * @param pushMessage */ public void publish(int qos, String topic, String pushMessage) { publish(qos, false, topic, pushMessage); } }
import javax.annotation.Resource; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.stereotype.Component; import com.slife.cws.mqtt.config.MqttConfig; import com.slife.cws.mqtt.pojo.MqttResponseBody; import com.slife.cws.mqtt.pojo.MqttResponseHeartbeat; import com.slife.cws.mqtt.service.MqttService; import com.slife.cws.mqtt.service.impl.MqttServiceImpl; import com.slife.cws.utils.JSONUtils; import com.slife.cws.utils.SpringUtil; import lombok.extern.slf4j.Slf4j; /** * @Package com.shhw.mqtt.component * @ClassName: PushCallback * @Description: 进行双向通信的时候,监听订阅的客户端和主题是否处于连接状态 * @Author youli * @date 2020年10月16日 * @CopyRight:上海成生科技有限公司 */ @Slf4j @Component public class PushCallback<component> implements MqttCallback { private MqttPushClient client; private MqttConfig mqttConfiguration; @Resource MqttService mqttService; public PushCallback(MqttPushClient client, MqttConfig mqttConfiguration) { this.client = client; this.mqttConfiguration = mqttConfiguration; } @Override public void connectionLost(Throwable cause) { /** 连接丢失后,一般在这里面进行重连 **/ if (client != null) { while (true) { try { log.info("==============》》》[MQTT] 连接丢失,尝试重连..."); MqttPushClient mqttPushClient = new MqttPushClient(); mqttPushClient.connect(mqttConfiguration); if (MqttPushClient.getClient().isConnected()) { log.info("=============>>重连成功"); } break; } catch (Exception e) { log.error("=============>>>[MQTT] 连接断开,重连失败!<<============="); continue; } } } log.info(cause.getMessage()); } @Override public void deliveryComplete(IMqttDeliveryToken token) { // publish后会执行到这里 log.info("pushComplete==============>>>" + token.isComplete()); } /** * 监听对应的主题消息 * * @param topic * @param message * @throws Exception */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe后得到的消息会执行到这里面 log.info("============》》接收消息主题 : " + topic); log.info("============》》接收消息Qos : " + message.getQos()); log.info("============》》接收消息内容原始内容 : " + new String(message.getPayload())); log.info("============》》接收消息内容GB2312 : " + new String(message.getPayload(), "GB2312")); log.info("============》》接收消息内容UTF-8 : " + new String(message.getPayload(), "UTF-8")); try { if (topic.equals("datapoint")) { MqttResponseBody mqttResponseBody = JSONUtils.jsonToBean(new String(message.getPayload(), "UTF-8"), MqttResponseBody.class); MqttService mqttService = SpringUtil.getBean(MqttServiceImpl.class); mqttService.messageArrived(mqttResponseBody); } else if (topic.equals("heartbeat")) { MqttResponseHeartbeat mqttResponseHeartbeat = JSONUtils .jsonToBean(new String(message.getPayload(), "UTF-8"), MqttResponseHeartbeat.class); MqttService mqttService = SpringUtil.getBean(MqttServiceImpl.class); mqttService.messageHeartbeat(mqttResponseHeartbeat); } } catch (Exception e) { e.printStackTrace(); log.info("============》》接收消息主题异常 : " + e.getMessage()); } } }
import com.slife.cws.mqtt.pojo.MqttResponseBody; import com.slife.cws.mqtt.pojo.MqttResponseHeartbeat; public interface MqttService { /** * @Title: sendMessage * @Description: 发送消息 * @Author youli * @date 2020年11月9日 * @param gpsMsg */ void sendMessage(); /** * @Title: messageArrived * @Description: 监听发送消息 * @Author youli * @date 2021年2月16日 * @param mqttResponseBody */ void messageArrived(MqttResponseBody mqttResponseBody); /** * @Title: messageHeartbeat * @Description: 设备心跳监听 * @Author youli * @date 2021年7月4日 * @param mqttResponseHeartbeat */ void messageHeartbeat(MqttResponseHeartbeat mqttResponseHeartbeat); }
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.slife.cws.mqtt.component.MqttSender; import com.slife.cws.mqtt.config.MqttConfig; import com.slife.cws.mqtt.pojo.MqttResponseBody; import com.slife.cws.mqtt.pojo.MqttResponseHeartbeat; import com.slife.cws.mqtt.service.MqttService; import com.slife.cws.utils.JSONUtils; import com.slife.ews.service.DatumrealtimeService; import lombok.extern.slf4j.Slf4j; @Service @Slf4j public class MqttServiceImpl implements MqttService { @Autowired MqttConfig mqttConfig; @Autowired private MqttSender mqttSender; @Autowired DataService dataService; @Override public void sendMessage() { String jsonStr = null; mqttSender.send(mqttConfig.getTopic()[0], jsonStr); } @Override public void messageArrived(MqttResponseBody mqttResponseBody) { log.info("接口的消息:{}", JSONUtils.beanToJson(mqttResponseBody)); dataService.save(mqttResponseBody); } @Override public void messageHeartbeat(MqttResponseHeartbeat mqttResponseHeartbeat) { log.info("监听心跳信息:{}", JSONUtils.beanToJson(mqttResponseHeartbeat)); dataService.heartbeat(mqttResponseHeartbeat); } }
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.slife.cws.mqtt.component.MqttPushClient; import com.slife.cws.mqtt.component.MqttSender; import com.slife.cws.mqtt.config.MqttConfig; @RestController public class MessageController { @Autowired MqttConfig mqttConfig; @Autowired private MqttSender mqttSender; @Autowired private MqttPushClient mqttPushClient; /*** * 发布消息,用于其他客户端消息接收测试 */ @RequestMapping("/sendMqttMessage") public String sendMqttMessage(String topic) { String jsonStr = "{"truckPic":"20201029111.jpg","httpRootPic":"url"}"; mqttSender.send(topic, jsonStr); return "ok"; } @RequestMapping("/mqttop") public String mqttop() { String TOPIC1 = "test_topic1"; String TOPIC2 = "test_topic2"; String TOPIC3 = "test_topic3"; String TOPIC4 = "test_topic4"; int Qos1 = 1; int Qos2 = 1; int Qos3 = 1; int Qos4 = 1; String[] topics = { TOPIC1, TOPIC2, TOPIC3, TOPIC4 }; int[] qos = { Qos1, Qos2, Qos3, Qos4 }; mqttPushClient.subscribe(topics, qos); return "订阅主题"; } public static void main(String[] args) { long time = System.currentTimeMillis(); System.out.println(time); } }