zoukankan      html  css  js  c++  java
  • spring boot + mqtt 物联网开发

    最近这一年里,在项目实战的时候,遇到了mqtt开发,今天我就大致的来总结下,mqtt在spring boot的使用

    1、引用jar

     <!-- mqtt -->
    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
    	<groupId>org.springframework.integration</groupId>
    	<artifactId>spring-integration-stream</artifactId>
    </dependency>
    <dependency>
    	<groupId>org.springframework.integration</groupId>
    	<artifactId>spring-integration-mqtt</artifactId>
    </dependency>
    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-configuration-processor</artifactId>
    	<optional>true</optional>
    </dependency>
    

    2.项目启动建立链接

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.ApplicationArguments;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.stereotype.Component;
    
    import com.slife.cws.mqtt.component.MqttPushClient;
    import com.slife.cws.mqtt.config.MqttConfig;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Component
    @Slf4j
    public class MqttApplicationRunner implements ApplicationRunner {
    
    	@Autowired
    	private MqttConfig mqttConfig;
    
    	@Override
    	public void run(ApplicationArguments args) throws Exception {
    		if (log.isInfoEnabled()) {
    			log.info("===============>>>Mqtt is run starting:<<==================");
    		}
    		MqttPushClient mqttPushClient = new MqttPushClient();
    		mqttPushClient.connect(mqttConfig);
    		// 订阅主题
    		mqttPushClient.subscribe(mqttConfig.getTopic(), mqttConfig.getQos());
    	}
    
    }
    

    3.相关配置及实现类

    ①、配置

    #spring.mqtt.url=tcp://127.0.0.1
    spring.mqtt.url=tcp://127.0.0.1
    spring.mqtt.username= nbew
    spring.mqtt.password= 123456
    spring.mqtt.client-id= 100201101
    spring.mqtt.topics= top
    spring.mqtt.completion-timeout= 3000
    spring.mqtt.timeout= 120
    spring.mqtt.keep-alive= 20
    spring.mqtt.qos= 1,1
    
    spring:
      mqtt:
        url: tcp://mqtt.rootcloudapp.com:1883
        username: 0ba851e2e83609b9
        password: 81757448a4df0d73
        client-id: 0ba851e2e83609b9
        id: test10000011
        topics: v4/p/post/thing/live/json/1.1
        completion-timeout: 3000
        timeout: 30
        keep-alive: 60
        qos: 1
    

      

    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.stereotype.Component;
    
    import lombok.Data;
    
    //@ConfigurationProperties(prefix = "spring.mqtt")
    @Data
    @Component
    @Configuration
    public class MqttConfig {
    
    	/**
    	 * 链接url
    	 */
    	@Value("${spring.mqtt.url}")
    	private String url;
    
    	/**
    	 * 用户名
    	 */
    	@Value("${spring.mqtt.username}")
    	private String username;
    
    	/**
    	 * 密码
    	 */
    	@Value("${spring.mqtt.password}")
    	private String password;
    
    	/**
    	 * 客户端id
    	 */
    	@Value("${spring.mqtt.client-id}")
    	private String clientId;
    
    	/**
    	 * 通讯标识 id
    	 */
    	@Value("${spring.mqtt.id}")
    	private String id;
    
    	/**
    	 * 主题
    	 */
    	@Value("${spring.mqtt.topics}")
    	private String[] topic;
    
    	/**
    	 * 超时时间
    	 */
    	@Value("${spring.mqtt.timeout}")
    	private int timeout;
    
    	/**
    	 * 心跳检测时间
    	 */
    	@Value("${spring.mqtt.keep-alive}")
    	private int keepAlive;
    
    	/**
    	 * 心跳包级别
    	 */
    	@Value("${spring.mqtt.qos}")
    	private int[] qos;
    
    	private int completionTimeout;
    
    }
    

      

    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    /**
     * @Package com.slife.cws.mqtt.component
     * @ClassName: Mqttbean
     * @Description: 客户端
     * @Author youli
     * @date 2021年2月16日
     * @CopyRight:上海成生科技有限公司
     */
    @Component
    public class Mqttbean {
    
    	@Bean("mqttPushClient")
    	public MqttPushClient getMqttPushClient() {
    		MqttPushClient mqttPushClient = new MqttPushClient();
    		return mqttPushClient;
    	}
    
    }
    

      

    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
    import org.eclipse.paho.client.mqttv3.MqttTopic;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    import com.slife.cws.mqtt.config.MqttConfig;
    
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * @Package com.shhw.mqtt.component
     * @ClassName: MqttPushClient
     * @Description: MqttClient客户端代码
     * @Author youli
     * @date 2020年10月16日
     * @CopyRight:上海成生科技有限公司
     */
    @Slf4j
    public class MqttPushClient {
    
    	private static MqttClient client;
    
    	public static MqttClient getClient() {
    		return client;
    	}
    
    	public static void setClient(MqttClient client) {
    		MqttPushClient.client = client;
    	}
    
    	private MqttConnectOptions getOption(String userName, String password, int outTime, int KeepAlive) {
    		// MQTT连接设置
    		MqttConnectOptions option = new MqttConnectOptions();
    		// 设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接
    		option.setCleanSession(false);
    		// 设置连接的用户名
    		option.setUserName(userName);
    		// 设置连接的密码
    		option.setPassword(password.toCharArray());
    		// 设置超时时间 单位为秒
    		option.setConnectionTimeout(outTime);
    		// 设置会话心跳时间 单位为秒 服务器会每隔(1.5*keepTime)秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
    		option.setKeepAliveInterval(KeepAlive);
    		// setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
    		// option.setWill(topic, "close".getBytes(), 2, true);
    		option.setMaxInflight(1000);
    		log.info("================>>>MQTT连接认证成功<<======================");
    		return option;
    	}
    
    	/**
    	 * 连接
    	 */
    	public void connect(MqttConfig mqttConfig) {
    		MqttClient client;
    		try {
    			String clientId = mqttConfig.getClientId();
    			clientId += System.currentTimeMillis();
    			client = new MqttClient(mqttConfig.getUrl(), clientId, new MemoryPersistence());
    			MqttConnectOptions options = getOption(mqttConfig.getUsername(), mqttConfig.getPassword(),
    					mqttConfig.getTimeout(), mqttConfig.getKeepAlive());
    			MqttPushClient.setClient(client);
    			try {
    				client.setCallback(new PushCallback<Object>(this, mqttConfig));
    				if (!client.isConnected()) {
    					client.connect(options);
    					log.info("================>>>MQTT连接成功<<======================");
    					 //订阅主题
    					subscribe(mqttConfig.getTopic(), mqttConfig.getQos());
    				} else {// 这里的逻辑是如果连接不成功就重新连接
    					client.disconnect();
    					client.connect(options);
    					log.info("===================>>>MQTT断连成功<<<======================");
    				}
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    
    	/**
    	 * 断线重连
    	 *
    	 * @throws Exception
    	 */
    	public Boolean reConnect() throws Exception {
    		Boolean isConnected = false;
    		if (null != client) {
    			client.connect();
    			if (client.isConnected()) {
    				isConnected = true;
    			}
    		}
    		return isConnected;
    	}
    
    	/**
    	 * 发布,默认qos为0,非持久化
    	 *
    	 * @param topic
    	 * @param pushMessage
    	 */
    	public void publish(String topic, String pushMessage) {
    		publish(0, false, topic, pushMessage);
    	}
    
    	/**
    	 * 发布
    	 *
    	 * @param qos
    	 * @param retained
    	 * @param topic
    	 * @param pushMessage
    	 */
    	public void publish(int qos, boolean retained, String topic, String pushMessage) {
    		MqttMessage message = new MqttMessage();
    		message.setQos(qos);
    		message.setRetained(retained);
    		message.setPayload(pushMessage.getBytes());
    		MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
    		if (null == mTopic) {
    			log.error("===============>>>MQTT topic 不存在<<=======================");
    		}
    		MqttDeliveryToken token;
    		try {
    			token = mTopic.publish(message);
    			token.waitForCompletion();
    		} catch (MqttPersistenceException e) {
    			e.printStackTrace();
    		} catch (MqttException e) {
    			e.printStackTrace();
    		}
    	}
    
    	/**
    	 * 发布消息的服务质量(推荐为:2-确保消息到达一次。0-至多一次到达;1-至少一次到达,可能重复), retained
    	 * 默认:false-非持久化(是指一条消息消费完,就会被删除;持久化,消费完,还会保存在服务器中,当新的订阅者出现,继续给新订阅者消费)
    	 *
    	 * @param topic
    	 * @param pushMessage
    	 */
    	public void publish(int qos, String topic, String pushMessage) {
    		publish(qos, false, topic, pushMessage);
    	}
    
    	/**
    	 * 订阅某个主题,qos默认为0
    	 * 
    	 * @param topic
    	 */
    	public void subscribe(String[] topic) {
    		subscribe(topic, null);
    	}
    
    	/**
    	 * 订阅某个主题
    	 *
    	 * @param topic
    	 * @param qos
    	 */
    	public void subscribe(String[] topic, int[] qos) {
    		try {
    			MqttPushClient.getClient().unsubscribe(topic);
    			MqttPushClient.getClient().subscribe(topic, qos);
    		} catch (MqttException e) {
    			e.printStackTrace();
    		}
    	}
    }
    

      

    import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
    import org.eclipse.paho.client.mqttv3.MqttTopic;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Component;
    
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * @Package com.shhw.mqtt.component
     * @ClassName: MqttSender
     * @Description: 主题发布
     * @Author youli
     * @date 2020年10月16日
     * @CopyRight:上海成生科技有限公司
     */
    @Component(value = "mqttSender")
    @Slf4j
    public class MqttSender {
    
    	@Async
    	public void send(String queueName, String msg) {
    		log.debug("=====================>>>>发送主题:{},  msg:{}", queueName,msg);
    		publish(2, queueName, msg);
    	}
    
    	/**
    	 * 发布,默认qos为0,非持久化
    	 * 
    	 * @param topic
    	 * @param pushMessage
    	 */
    	public void publish(String topic, String pushMessage) {
    		publish(1, false, topic, pushMessage);
    	}
    
    	/**
    	 * 发布
    	 * 
    	 * @param qos
    	 * @param retained
    	 * @param topic
    	 * @param pushMessage
    	 */
    	public void publish(int qos, boolean retained, String topic, String pushMessage) {
    		MqttMessage message = new MqttMessage();
    		message.setQos(qos);
    		message.setRetained(retained);
    		message.setPayload(pushMessage.getBytes());
    		MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
    		if (null == mTopic) {
    			log.error("===================>>>MQTT topic 不存在<<=================");
    		}
    		MqttDeliveryToken token;
    		try {
    			token = mTopic.publish(message);
    			token.waitForCompletion();
    		} catch (MqttPersistenceException e) {
    			log.error("============>>>publish fail", e);
    			e.printStackTrace();
    		} catch (MqttException e) {
    			e.printStackTrace();
    		}
    	}
    
    	/**
    	 * 发布消息的服务质量(推荐为:2-确保消息到达一次。0-至多一次到达;1-至少一次到达,可能重复), retained
    	 * 默认:false-非持久化(是指一条消息消费完,就会被删除;持久化,消费完,还会保存在服务器中,当新的订阅者出现,继续给新订阅者消费)
    	 * 
    	 * @param topic
    	 * @param pushMessage
    	 */
    	public void publish(int qos, String topic, String pushMessage) {
    		publish(qos, false, topic, pushMessage);
    	}
    
    }
    

      

    import javax.annotation.Resource;
    
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.springframework.stereotype.Component;
    
    import com.slife.cws.mqtt.config.MqttConfig;
    import com.slife.cws.mqtt.pojo.MqttResponseBody;
    import com.slife.cws.mqtt.pojo.MqttResponseHeartbeat;
    import com.slife.cws.mqtt.service.MqttService;
    import com.slife.cws.mqtt.service.impl.MqttServiceImpl;
    import com.slife.cws.utils.JSONUtils;
    import com.slife.cws.utils.SpringUtil;
    
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * @Package com.shhw.mqtt.component
     * @ClassName: PushCallback
     * @Description: 进行双向通信的时候,监听订阅的客户端和主题是否处于连接状态
     * @Author youli
     * @date 2020年10月16日
     * @CopyRight:上海成生科技有限公司
     */
    @Slf4j
    @Component
    public class PushCallback<component> implements MqttCallback {
    
    	private MqttPushClient client;
    
    	private MqttConfig mqttConfiguration;
    
    	@Resource
    	MqttService mqttService;
    
    	public PushCallback(MqttPushClient client, MqttConfig mqttConfiguration) {
    		this.client = client;
    		this.mqttConfiguration = mqttConfiguration;
    	}
    
    	@Override
    	public void connectionLost(Throwable cause) {
    		/** 连接丢失后,一般在这里面进行重连 **/
    		if (client != null) {
    			while (true) {
    				try {
    					log.info("==============》》》[MQTT] 连接丢失,尝试重连...");
    					MqttPushClient mqttPushClient = new MqttPushClient();
    					mqttPushClient.connect(mqttConfiguration);
    					if (MqttPushClient.getClient().isConnected()) {
    						log.info("=============>>重连成功");
    					}
    					break;
    				} catch (Exception e) {
    					log.error("=============>>>[MQTT] 连接断开,重连失败!<<=============");
    					continue;
    				}
    			}
    		}
    		log.info(cause.getMessage());
    	}
    
    	@Override
    	public void deliveryComplete(IMqttDeliveryToken token) {
    		// publish后会执行到这里
    		log.info("pushComplete==============>>>" + token.isComplete());
    	}
    
    	/**
    	 * 监听对应的主题消息
    	 * 
    	 * @param topic
    	 * @param message
    	 * @throws Exception
    	 */
    	@Override
    	public void messageArrived(String topic, MqttMessage message) throws Exception {
    		// subscribe后得到的消息会执行到这里面
    		log.info("============》》接收消息主题 : " + topic);
    		log.info("============》》接收消息Qos : " + message.getQos());
    		log.info("============》》接收消息内容原始内容 : " + new String(message.getPayload()));
    		log.info("============》》接收消息内容GB2312 : " + new String(message.getPayload(), "GB2312"));
    		log.info("============》》接收消息内容UTF-8 : " + new String(message.getPayload(), "UTF-8"));
    		try {
    			if (topic.equals("datapoint")) {
    				MqttResponseBody mqttResponseBody = JSONUtils.jsonToBean(new String(message.getPayload(), "UTF-8"),
    						MqttResponseBody.class);
    				MqttService mqttService = SpringUtil.getBean(MqttServiceImpl.class);
    				mqttService.messageArrived(mqttResponseBody);
    			} else if (topic.equals("heartbeat")) {
    				MqttResponseHeartbeat mqttResponseHeartbeat = JSONUtils
    						.jsonToBean(new String(message.getPayload(), "UTF-8"), MqttResponseHeartbeat.class);
    				MqttService mqttService = SpringUtil.getBean(MqttServiceImpl.class);
    				mqttService.messageHeartbeat(mqttResponseHeartbeat);
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    			log.info("============》》接收消息主题异常 : " + e.getMessage());
    		}
    	}
    
    }
    

      

    import com.slife.cws.mqtt.pojo.MqttResponseBody;
    import com.slife.cws.mqtt.pojo.MqttResponseHeartbeat;
    
    public interface MqttService {
    
    	/**
    	 * @Title: sendMessage
    	 * @Description: 发送消息
    	 * @Author youli
    	 * @date 2020年11月9日
    	 * @param gpsMsg
    	 */
    	void sendMessage();
    
    	/**
    	 * @Title: messageArrived
    	 * @Description: 监听发送消息
    	 * @Author youli
    	 * @date 2021年2月16日
    	 * @param mqttResponseBody
    	 */
    	void messageArrived(MqttResponseBody mqttResponseBody);
    
    	/**
    	 * @Title: messageHeartbeat
    	 * @Description: 设备心跳监听
    	 * @Author youli
    	 * @date 2021年7月4日
    	 * @param mqttResponseHeartbeat
    	 */
    	void messageHeartbeat(MqttResponseHeartbeat mqttResponseHeartbeat);
    }
    

      

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import com.slife.cws.mqtt.component.MqttSender;
    import com.slife.cws.mqtt.config.MqttConfig;
    import com.slife.cws.mqtt.pojo.MqttResponseBody;
    import com.slife.cws.mqtt.pojo.MqttResponseHeartbeat;
    import com.slife.cws.mqtt.service.MqttService;
    import com.slife.cws.utils.JSONUtils;
    import com.slife.ews.service.DatumrealtimeService;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Service
    @Slf4j
    public class MqttServiceImpl implements MqttService {
    
    	@Autowired
    	MqttConfig mqttConfig;
    
    	@Autowired
    	private MqttSender mqttSender;
    
    	@Autowired
    	DataService dataService;
    
    	@Override
    	public void sendMessage() {
    		String jsonStr = null;
    		mqttSender.send(mqttConfig.getTopic()[0], jsonStr);
    
    	}
    
    	@Override
    	public void messageArrived(MqttResponseBody mqttResponseBody) {
    		log.info("接口的消息:{}", JSONUtils.beanToJson(mqttResponseBody));
    		dataService.save(mqttResponseBody);
    	}
    
    	@Override
    	public void messageHeartbeat(MqttResponseHeartbeat mqttResponseHeartbeat) {
    		log.info("监听心跳信息:{}", JSONUtils.beanToJson(mqttResponseHeartbeat));
    		dataService.heartbeat(mqttResponseHeartbeat);
    	}
    
    }
    

      

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.slife.cws.mqtt.component.MqttPushClient;
    import com.slife.cws.mqtt.component.MqttSender;
    import com.slife.cws.mqtt.config.MqttConfig;
    
    @RestController
    public class MessageController {
    
    	@Autowired
    	MqttConfig mqttConfig;
    
    	@Autowired
    	private MqttSender mqttSender;
    
    	@Autowired
    	private MqttPushClient mqttPushClient;
    
    	/***
    	 * 发布消息,用于其他客户端消息接收测试
    	 */
    	@RequestMapping("/sendMqttMessage")
    	public String sendMqttMessage(String topic) {
    		String jsonStr = "{"truckPic":"20201029111.jpg","httpRootPic":"url"}";
    		mqttSender.send(topic, jsonStr);
    		return "ok";
    	}
    
    	@RequestMapping("/mqttop")
    	public String mqttop() {
    		String TOPIC1 = "test_topic1";
    		String TOPIC2 = "test_topic2";
    		String TOPIC3 = "test_topic3";
    		String TOPIC4 = "test_topic4";
    
    		int Qos1 = 1;
    		int Qos2 = 1;
    		int Qos3 = 1;
    		int Qos4 = 1;
    
    		String[] topics = { TOPIC1, TOPIC2, TOPIC3, TOPIC4 };
    		int[] qos = { Qos1, Qos2, Qos3, Qos4 };
    		mqttPushClient.subscribe(topics, qos);
    		return "订阅主题";
    	}
    
    	public static void main(String[] args) {
    		long time = System.currentTimeMillis();
    		System.out.println(time);
    	}
    
    }
    

      

  • 相关阅读:
    Vue:Vue CLI 3的学习
    npm:基础
    Spring Boot:@Value和@ConfigurationProperties
    Spring Boot:引入依赖时何时不指定版本号
    数据库事物的四大特性及隔离级别
    Python之xml文档及配置文件处理(ElementTree模块、ConfigParser模块)
    Python之数据序列化(json、pickle、shelve)
    Python之文件与目录操作(os、zipfile、tarfile、shutil)
    Python之日期与时间处理模块(date和datetime)
    Python之列表生成式、生成器、可迭代对象与迭代器
  • 原文地址:https://www.cnblogs.com/haoliyou/p/15157407.html
Copyright © 2011-2022 走看看