zoukankan      html  css  js  c++  java
  • emq共享订阅

    1 共享订阅

    多个客户端订阅了同一个主题,发布者发布主题时,每个客户端都会同时收到这个主题的消息。在客户端集群部署的场景下会出现消息重复处理的问题。
    EMQ支持共享订阅,多个客户端订阅了同一个主题,发布者发布主题时,只有其中一个客户端接收到消息。
    共享订阅有两种方式:
    (1)共享订阅:订阅前缀$queue/
    多个客户端订阅了$queue/topic,发布者发布到topic,则只有一个客户端会接收到消息。
    (2)分组订阅:订阅前缀$share/<group>/
    多组客户端订阅了$queue/group1/topic、$queue/group2/topic...,发布者发布到topic,则消息会发布到每个group中,但是每个group中只有一个客户端会接收到消息。

    2 Java客户端实现共享订阅

    开发时发现,使用eclipse paho java客户端时,无法处理共享订阅。订阅$queue/topic能够订阅成功,并且跟踪代码能看到emq也把消息转发到了客户端,但是客户端丢弃掉了。
    解决方法就是重写mqtt的回调函数,实现MqttCallback接口。

    实现MqttCallback接口的代码如下:

    package com.emqtest.emqtest;
    
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.MqttTopic;
    
    public class SharedSubCallbackRouter implements MqttCallback {
        private Map<String, IMqttMessageListener> topicFilterListeners;
    
        public SharedSubCallbackRouter(Map<String, IMqttMessageListener> topicFilterListeners) {
            this.topicFilterListeners = topicFilterListeners;
        }
    
        public void addSubscriber(String topicFilter, IMqttMessageListener listener) {
            if (this.topicFilterListeners == null) {
                 this.topicFilterListeners = new HashMap<>();
            }
            this.topicFilterListeners.put(topicFilter, listener);
        }
    
        @Override
        public void connectionLost(Throwable cause) {
    
        }
    
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            for (Map.Entry<String, IMqttMessageListener> listenerEntry : topicFilterListeners.entrySet()) {
                String topicFilter = listenerEntry.getKey();
                if (isMatched(topicFilter, topic)) {
                    listenerEntry.getValue().messageArrived(topic, message);
                }
            }
        }
    
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
    
        }
    
        /**
         * Paho topic matcher does not work with shared subscription topic filter of emqttd
         * https://github.com/eclipse/paho.mqtt.java/issues/367#issuecomment-300100385
         * <p>
         * http://emqtt.io/docs/v2/advanced.html#shared-subscription
         *
         * @param topicFilter the topicFilter for mqtt
         * @param topic       the topic
         * @return boolean for matched
         */
        private boolean isMatched(String topicFilter, String topic) {
            if (topicFilter.startsWith("$queue/")) {
                topicFilter = topicFilter.replaceFirst("\$queue/", "");
            } else if (topicFilter.startsWith("$share/")) {
                topicFilter = topicFilter.replaceFirst("\$share/", "");
                topicFilter = topicFilter.substring(topicFilter.indexOf('/'));
            }
            return MqttTopic.isMatched(topicFilter, topic);
        }
    }
    

    创建emq连接代码如下:

    mqttClient = new MqttClient("tcp://localhost:1883", "MqttClient");
    mqttClient.connect();
    Map<String, IMqttMessageListener> listeners = new HashMap<>();
    IMqttMessageListener emqListener = new EmqListener();
    listeners.put("$queue/testmqtt", emqListener);
    mqttClient.setCallback(new SharedSubCallbackRouter(listeners));
    mqttClient.subscribe("$queue/testmqtt", new EmqListener());
    

    还要再写一个实现IMqttMessageListener接口的Emq消息处理类:

    @Component
    public class EmqListener implements IMqttMessageListener {
    
      @Override
      public void messageArrived(String topic, MqttMessage message) throws Exception {
        try {
          System.out.println("topic: " + topic);
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }
    

      

    参考链接:

    1 emq的github上关于这个问题的讨论:
    https://github.com/emqx/emqx/issues/921#event-1023359646
    2 网上有人给的一个解决方法示例代码:
    https://github.com/yogin16/paho-shared-sub-example
    3 eclipse paho的github链接:
    https://github.com/eclipse/paho.mqtt.java

  • 相关阅读:
    mysql general log使用介绍
    是否可以根据GTID 选出日志最新的实例
    python踩坑现场,看起来一样的两个字符串,却不相等
    sql case when的使用
    golang 匿名结构体成员,具名结构体成员,继承,组合
    golang go-sql-driver/mysql基本原理
    raft协议中的日志安全性
    go get 安装 go.etcd.io etcd clientv3 报错
    ZGC
    发现jdk9之后,AQS代码有啥变化了吗
  • 原文地址:https://www.cnblogs.com/lasdaybg/p/11419711.html
Copyright © 2011-2022 走看看