mqtt消息客户端
package com.cjcx.inter.apimall.beijing.aibee; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; /** * Mqtt 简易客户端 * Topic naming Examples: * Valid Topic subscriptions * Single topic subscriptions * <p> * / * /house * house/room/main-light * house/room/side-light * Using Wildcards * Subscribing to topic house/# * <p> * Covers * <p> * house/room1/main-light * house/room1/alarm * house/garage/main-light * house/main-door * etc * Subscribing to topic house/+/main-light * <p> * covers * <p> * house/room1/main-light * house/room2/main-light * house/garage/main-light * but doesn’t cover * <p> * house/room1/side-light * house/room2/side-light * Invalid Topic Subscriptions * house+ – Reason- no topic level * house# – Reason- no topic level * Publishing to Topics * A client can only publish to an individual topic. That is, using wildcards when publishing is not allowed. * <p> * E.G- To publish a message to two topics you need to publish the message twice */ @Component public class AibeeMqttClient { private Logger logger = LoggerFactory.getLogger(this.getClass()); private String url; private String clientId = null; public static MqttClient mqttClient = null; private static MemoryPersistence memoryPersistence = null; private static MqttConnectOptions mqttConnectOptions = null; public AibeeMqttClient() { logger.info("AibeeMqttClient come int"); } public void init(String url, String clientId) { if (!StringUtils.hasLength(clientId)) { logger.warn("MQTT clientId is null"); return; } this.url = url; this.clientId = clientId; // 初始化连接设置对象 mqttConnectOptions = new MqttConnectOptions(); // true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态 mqttConnectOptions.setCleanSession(true); // 设置连接超时 mqttConnectOptions.setConnectionTimeout(30); // 设置持久化方式 memoryPersistence = new MemoryPersistence(); try { mqttClient = new MqttClient(url, clientId, memoryPersistence); } catch (MqttException e) { e.printStackTrace(); logger.warn("MQTT new MqttClient() 异常:{}", e); return; } // 设置连接和回调 if (!mqttClient.isConnected()) { // 客户端添加回调函数 mqttClient.setCallback(new MqttCallbackExtended() { @Override public void connectionLost(Throwable throwable) { try { logger.info("MQTT 连接已断开, 60秒后重新连接"); Thread.sleep(60 * 1000L); } catch (InterruptedException e) { e.printStackTrace(); } reConnect(); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("Client 接收消息主题 : " + topic); System.out.println("Client 接收消息Qos : " + message.getQos()); System.out.println("Client 接收消息内容 : " + new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { logger.info("deliveryComplete"); } @Override public void connectComplete(boolean b, String s) { logger.info("connectComplete:{}, s:{}", b, s); } }); // 创建连接 try { mqttClient.connect(mqttConnectOptions); logger.info("MQTT 连接状态: {}", (mqttClient.isConnected() ? "已连接" : "未连接")); } catch (MqttException e) { e.printStackTrace(); logger.warn("MQTT 连接异常:{}", e); } } else { logger.info("MQTT 连接状态已经连接.."); } } //关闭连接 public void closeConnect() { // 关闭存储方式 if (null != memoryPersistence) { try { memoryPersistence.close(); } catch (MqttPersistenceException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { logger.info("memoryPersistence is null"); } // 关闭连接 if (null != mqttClient) { if (mqttClient.isConnected()) { try { mqttClient.disconnect(); mqttClient.close(); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { logger.info("mqttClient is not connect"); } } else { logger.info("mqttClient is null"); } } // 发布消息 public void publishMessage(String pubTopic, String message, int qos) { if (null != mqttClient && mqttClient.isConnected()) { logger.info("发布消息 " + mqttClient.isConnected()); logger.info("id:" + mqttClient.getClientId()); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setPayload(message.getBytes()); MqttTopic topic = mqttClient.getTopic(pubTopic); if (null != topic) { try { MqttDeliveryToken publish = topic.publish(mqttMessage); if (!publish.isComplete()) { logger.info("消息发布成功"); } else { logger.info("消息发布失败"); } } catch (MqttException e) { e.printStackTrace(); } } } else { reConnect(); } } // 重新连接 public void reConnect() { if (null != mqttClient) { if (!mqttClient.isConnected()) { if (null != mqttConnectOptions) { try { mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { e.printStackTrace(); } } else { logger.info("MQTT 重连 mqttConnectOptions is null"); } } else { logger.info("MQTT 重连 mqttClient is null or connect"); } } else { init(url, clientId); } } // 订阅主题 public void subTopic(String topic, int qos) { if (null != mqttClient && mqttClient.isConnected()) { try { mqttClient.subscribe(topic, qos); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { logger.info("mqttClient is error"); } } // 清空主题 public void cleanTopic(String topic) { if (null != mqttClient && !mqttClient.isConnected()) { try { mqttClient.unsubscribe(topic); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { logger.info("mqttClient is error"); } } }
测试demo
package com.cjcx.inter.apimall.beijing.aibee; import org.apache.commons.lang3.StringUtils; public class ClientTest { public static void main(String[] args) { AibeeMqttClient aibeeMqttClient = new AibeeMqttClient(); aibeeMqttClient.init("tcp://127.0.0.1:1883", "123"); aibeeMqttClient.subTopic("/topic", 0); aibeeMqttClient.subTopic("house/room/a", 1); aibeeMqttClient.subTopic("house/room/b", 1); java.util.Scanner sc = new java.util.Scanner(System.in); int i = 1; StringBuilder sb = new StringBuilder(); sb.append("使用指南: "); sb.append(i++ + "、发送消息,输入 'send topic message'. "); sb.append(i++ + "、退出程序,输入 'exit'. "); System.out.println(sb); String line = sc.nextLine(); // 这个就是用户输入的数据 while (true) { if ("exit".equalsIgnoreCase(line)) { System.out.println("Thanks for using! bye bye."); break; } else if ("?".equals(line)) { System.out.println(sb); } System.out.println("line:" + line); processCommand(aibeeMqttClient, line); line = sc.nextLine(); // 这个就是用户输入的数据 } // aibeeMqttClient.publishMessage("/topic", "i am comming...", 1); aibeeMqttClient.closeConnect(); } private static void processCommand(AibeeMqttClient aibeeMqttClient, String line) { try { if (StringUtils.isBlank(line)) { return; } String[] arr = line.split(" ");//StrUtil.split(line, " "); String command = arr[0]; System.out.println("command:" + command); if ("send".equalsIgnoreCase(command)) { aibeeMqttClient.publishMessage(arr[1], arr[2], 1); } } catch (Exception e) { e.printStackTrace(); } } }