zoukankan      html  css  js  c++  java
  • Java-Mqtt-ActiveMq(1)

    Java-Mqtt-ActiveMq(1)

    通过Java基于Mqtt协议与ActiveMq交互,订阅(获取消息)/推送主题

    1、Maven依赖

     <dependency>
       <groupId>org.springframework.integration</groupId>
       <artifactId>spring-integration-mqtt</artifactId>
     </dependency>

    2、基本逻辑

    1、Mqtt为协议,ActiveMq为消息组件实现Java同消息队列的交互。
    2、消息推送:根据topic、username、password将message推送至特定url。
    3、消息订阅:根据topic订阅消息,回调函数中可查看消息内容,进行业务逻辑处理。

    3、注意事项

    1、线程问题:
      推送:只有当业务场景触发时才调用推送,向消息队列发送消息,发送完成进程结束。
      订阅:服务启动便同消息队列保持连接。
      总结:根据使用场景的不同,推送/订阅分别在不同的线程(线程表示不太合适)中使用,订阅过程中Mqtt客户端一直同服务保持连接;推送过程中Mqtt客户端在完成推送之后自动断开不需要重新连接。
    2、clientId: 最好使用动态clientid(具体原因不详,暂未找到),使用静态clientid回导致无法获取订阅消息。踩坑。

    4、推送-push

    /**
     * Mqtt-推送(接口调用)
     * <p>
     * Mqtt-推送在接口调用时使用,支持向不同服务,不同主题发送消息
     *
     * @author: zy
     * @date: 2020-09-24 16:00
     */
    @Slf4j
    @Component
    public class MqttProducerUtil {
    
        /**
         * Mqtt客户端
         */
        private MqttClient mqttClient;
    
        /**
         * 客户端ID
         */
        private final String clientId = "PUSH" + (int) (Math.random() * 100000000);
    
    
        /**
         * 创建客户端
         *
         * @param mqttCallback 回调函数
         */
        public void setMqttClient(MqttCallback mqttCallback, String url, String userName, String password) throws
                MqttException {
            MqttConnectOptions options = mqttConnectOptions(url, userName, password);
            if (mqttCallback == null) {
                mqttClient.setCallback(new MqttCallBack());
            } else {
                mqttClient.setCallback(mqttCallback);
            }
            mqttClient.connect(options);
        }
    
        /**
         * 连接客户端
         */
        private MqttConnectOptions mqttConnectOptions(String url, String userName, String password) throws MqttException {
            mqttClient = new MqttClient(url, clientId, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(userName);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(60);
            // 同接口业务保持同一进程,无需重复连接
            options.setAutomaticReconnect(false);
            options.setCleanSession(false);
            return options;
        }
    
        /**
         * 向某个主题发布消息 默认qos:1
         *
         * @param topic:发布的主题
         * @param msg:发布的消息
         */
        public void pub(String topic, String msg) throws MqttException {
            log.info("----------开始发布主题:{},消息:{}", topic, msg);
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setPayload(msg.getBytes());
            MqttTopic mqttTopic = mqttClient.getTopic(topic);
            MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
        }
    
        /**
         * 向某个主题发布消息
         *
         * @param topic: 发布的主题
         * @param msg:   发布的消息
         * @param qos:   消息质量    Qos:0、1、2
         */
        public void pub(String topic, String msg, int qos) throws MqttException {
            log.info("----------开始发布主题:{},消息:{}", topic, msg);
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setQos(qos);
            mqttMessage.setPayload(msg.getBytes());
            MqttTopic mqttTopic = mqttClient.getTopic(topic);
            MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
        }
    
    
        /**
         * 关闭MQTT连接
         */
        public void close() throws MqttException {
            mqttClient.close();
            mqttClient.disconnect();
        }
    
        /**
         * 接口调用
         *
         * @param topic   主题
         * @param message 消息
         */
        public void doPush(String topic, String message, String url, String userName, String password) throws
                MqttException {
            setMqttClient(new MqttCallBack(), url, userName, password);
            pub(topic, message);
        }
    }

    5、订阅-sub

    /**
     * Mqtt-订阅(服务启动自动加载)
     * <p>
     * Mqtt-订阅为服务启动自动加载,如需新增订阅主题需在配置文件中新增(新服务需重新配置)
     *
     * @author: zy
     * @date: 2020-09-24 16:00
     */
    @Slf4j
    @Component
    public class MqttConsumerUtil {
    
        /**
         * Mqtt客户端
         */
        private MqttClient mqttClient;
    
        /**
         * 客户端ID
         */
        private final String clientId = "SUB" + (int) (Math.random() * 100000000);
    
        @Value("${config.mq-url}")
        private String mqUrl;
    
        @Value("config.mq-username")
        private String mqUserName;
    
        @Value("${config.mq-password}")
        private String mqPassword;
    
        @Value("${config.mq-topiclist}")
        private List<String> mqTopicList;
    
    
        /**
         * 创建客户端
         *
         * @param mqttCallback 回调函数
         */
        public void setMqttClient(MqttCallback mqttCallback) throws MqttException {
            MqttConnectOptions options = mqttConnectOptions();
            if (mqttCallback == null) {
                mqttClient.setCallback(new MqttCallBack());
            } else {
                mqttClient.setCallback(mqttCallback);
            }
            mqttClient.connect(options);
        }
    
        /**
         * 客户端连接
         */
        private MqttConnectOptions mqttConnectOptions() throws MqttException {
            mqttClient = new MqttClient(mqUrl, clientId, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqUserName);
            options.setPassword(mqPassword.toCharArray());
            options.setConnectionTimeout(10);
            options.setAutomaticReconnect(true);
            options.setCleanSession(false);
            return options;
        }
    
        /**
         * 订阅某一个主题 ,此方法默认的的Qos等级为:1
         */
        public void sub() throws MqttException {
            log.info("----------开始订阅主题----------");
            for (String topic : mqTopicList) {
                mqttClient.subscribe(topic);
            }
        }
    
        /**
         * 订阅某一个主题,可携带Qos
         *
         * @param qos 消息质量:0、1、2
         */
        public void sub(int qos) throws MqttException {
            log.info("----------开始订阅主题----------");
            for (String topic : mqTopicList) {
                mqttClient.subscribe(topic, qos);
            }
        }
    
        /**
         * 关闭MQTT连接
         */
        public void close() throws MqttException {
            mqttClient.close();
            mqttClient.disconnect();
        }
    }

    6、回调函数

    /**
     * Mqtt-回调函数
     *
     * @author: zy
     * @date: 2020-09-24 16:01
     */
    @Slf4j
    @Component
    public class MqttCallBack implements MqttCallback {
    
        /**
         * MQTT 断开连接会执行此方法
         */
        @Override
        public void connectionLost(Throwable throwable) {
            log.info("断开了MQTT连接 :{}", throwable.getMessage());
            log.error(throwable.getMessage(), throwable);
        }
    
        /**
         * publish发布成功后会执行到这里
         */
        @Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            log.info("发布消息成功");
        }
    
        /**
         * subscribe订阅后得到的消息会执行到这里
         */
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            //  TODO    此处可以将订阅得到的消息进行业务处理、数据存储
            log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload()));
        }
    }

    7、监听器

    /**
     * Mqtt-监听器
     *
     * @author: zy
     * @date: 2020-09-24 16:14
     */
    @Slf4j
    @Component
    public class MqttListenerUtil implements ApplicationListener<ContextRefreshedEvent> {
    
        private final MqttConsumerUtil server;
    
        @Autowired
        public MqttListenerUtil(MqttConsumerUtil server) {
            this.server = server;
        }
    
        @Override
        public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
            try {
                server.setMqttClient(new MqttCallBack());
                server.sub();
            } catch (MqttException e) {
                log.error(e.getMessage(), e);
            }
        }
    }

    代码亲测有效,如有疑问请留言!

    往外张望的人在做梦,向内审视的人才是清醒的
  • 相关阅读:
    UIBezierPath 画线
    医保卡
    UITextView 监听 return key的改变
    实现 UISegmentControl 与 UIScrollView的上下级联(分别在相应的方法中加入级联代码)
    webView、scrollView、TableView,为了防止滚动时出现偏移,底部黑框问题等
    UITabBar 设置字体的颜色(选中状态/正常状态)setTitleTextAttributes
    GitLab使用方法
    Dubbo快速入门
    zookeeper的安装
    核心配置文件常用配置标签
  • 原文地址:https://www.cnblogs.com/StefanieYang/p/13725494.html
Copyright © 2011-2022 走看看