依赖
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.3</version> </dependency>
配置
spring:
mqtt:
clientId: test1
url: tcp://192.168.1.24:1883
username: admin
password: 123456
配置类
MyMqttClient.java
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @Component public class MyMqttClient { public static MqttClient mqttClient = null; private static MemoryPersistence memoryPersistence = null; private static MqttConnectOptions mqttConnectOptions = null; @Autowired private MqttRecieveCallback mqttRecieveCallback; @Autowired private MqttTwoRecieveCallback mqttTwoRecieveCallback; @Value("${spring.mqtt.url}") private String serverURI; @Value("${spring.mqtt.clientId}") private String clientId; @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @PostConstruct public void init() { //初始化连接设置对象 mqttConnectOptions = new MqttConnectOptions(); //初始化MqttClient if (null != mqttConnectOptions) { // true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态 mqttConnectOptions.setCleanSession(true); // 设置连接超时 mqttConnectOptions.setConnectionTimeout(10); //设置账号密码 // mqttConnectOptions.setUserName(username); // mqttConnectOptions.setPassword(password.toCharArray()); // 设置持久化方式 memoryPersistence = new MemoryPersistence(); if (null != memoryPersistence && null != clientId) { try { mqttClient = new MqttClient(serverURI, clientId, memoryPersistence); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { } } else { System.out.println("mqttConnectOptions对象为空"); } System.out.println(mqttClient.isConnected()); //设置连接和回调 if (null != mqttClient) { if (!mqttClient.isConnected()) { // 创建回调函数对象 // MqttRecieveCallback mqttReceriveCallback = new MqttRecieveCallback(); // 客户端添加回调函数 // mqttClient.setCallback(mqttReceriveCallback); // 创建连接 try { System.out.println("创建连接"); mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } else { System.out.println("mqttClient为空"); } System.out.println(mqttClient.isConnected()); if (mqttClient.isConnected()) { try { //添加回调方法1 mqttClient.subscribe("topic/test1", 2, mqttRecieveCallback); //添加回调方法2 mqttClient.subscribe("topic/test2", 2, mqttTwoRecieveCallback); } catch (MqttException e) { e.printStackTrace(); } } } // 关闭连接 @PreDestroy public void closeConnect() { //关闭存储方式 if (null != memoryPersistence) { try { memoryPersistence.close(); } catch (MqttPersistenceException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { System.out.println("memoryPersistence is null"); } // 关闭连接 if (null != mqttClient) { if (mqttClient.isConnected()) { try { mqttClient.disconnect(); mqttClient.close(); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { System.out.println("mqttClient is not connect"); } } else { System.out.println("mqttClient is null"); } } // 发布消息 public void publishMessage(String pubTopic, String message, int qos,Boolean retained) { if (null != mqttClient && mqttClient.isConnected()) { System.out.println("发布消息 " + mqttClient.isConnected()); System.out.println("id:" + mqttClient.getClientId()); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setPayload(message.getBytes()); mqttMessage.setRetained(retained); MqttTopic topic = mqttClient.getTopic(pubTopic); if (null != topic) { try { MqttDeliveryToken publish = topic.publish(mqttMessage); if (!publish.isComplete()) { System.out.println("消息发布成功"); } } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } else { reConnect(); } } // 重新连接 public void reConnect() { if (null != mqttClient) { if (!mqttClient.isConnected()) { if (null != mqttConnectOptions) { try { mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { System.out.println("mqttConnectOptions is null"); } } else { System.out.println("mqttClient is null or connect"); } } else { init(); } } // 订阅主题 public void subTopic(String topic) { if (null != mqttClient && mqttClient.isConnected()) { try { mqttClient.subscribe(topic, 1); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { System.out.println("mqttClient is error"); } } // 清空主题 public void cleanTopic(String topic) { if (null != mqttClient && !mqttClient.isConnected()) { try { mqttClient.unsubscribe(topic); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { System.out.println("mqttClient is error"); } } }
回调类一
MqttRecieveCallback.java
import org.eclipse.paho.client.mqttv3.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MqttRecieveCallback implements MqttCallback, IMqttMessageListener { @Autowired private MyMqttClient client; @Override public void connectionLost(Throwable cause) { } @Override public void messageArrived(String topic, MqttMessage message) { System.out.println("Client 接收消息主题 : " + topic); System.out.println("Client 接收消息Qos : " + message.getQos()); System.out.println("Client 接收消息内容 : " + new String(message.getPayload())); /** * 发送消息 */ client.publishMessage("topic/test2","2",2,false); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }
回调类2
MqttTwoRecieveCallback.java
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.stereotype.Component; @Component public class MqttTwoRecieveCallback implements MqttCallback, IMqttMessageListener { @Override public void connectionLost(Throwable cause) { } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("Client2 接收消息主题 : " + topic); System.out.println("Client2 接收消息Qos : " + message.getQos()); System.out.println("Client2 接收消息内容 : " + new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }