zoukankan      html  css  js  c++  java
  • RocketMQ的使用

    1.如何发布?

    1.1.业务层调用发送mq消息方法  asyncService.sendAliMqttMsg();

    //相关配置
    public static final String TOPIC_TEST = "qbkj_test_mqtt";
    public static final String TOPIC_QUEUE = "qbkj_queue_mqtt";
    public static final String TOPIC_OBSERVE = "qbkj_observe_mqtt";
    /** 排号类型-登记台 */
    public static final String QUEUE_TYPE_REG = "1";
    /** 排号类型-接种台 */
    public static final String QUEUE_TYPE_INJECT = "2";


    //
    TODO:登记台叫号 Map<String, Object> param = new HashMap<String, Object>(); param.put("type", WebSocket.QUEUE_TYPE_REG); param.put("reg", callList); param.put("queueno", bsRegQueue.getQueue()); param.put("childname", bsRegQueue.getName()); param.put("roomcode", bsRegQueue.getRoom()); param.put("localcode", bsRegQueue.getLocalCode2()); if("mqtt".equals(Global.getConfig("quene_mode"))){ asyncService.sendAliMqttMsg(AliMqManager.TOPIC_QUEUE,bsRegQueue.getLocalCode2(), JsonMapper.toJsonString(param)); }else{ WebSocket.sendBroadCast(JsonMapper.toJsonString(param), bsRegQueue.getLocalCode2()); }

    1.2.sendAliMqttMsg()具体方法

    public void sendAliMqttMsg(String topic, String tag, String txt){
        try {
          AliMqManager.sendMqttMsg(topic, tag, txt);
        } catch (UnsupportedEncodingException e) {
          logger.error("sendAliMqttMsg失败{}",e.getMessage());
        }
    }

    1.3.再进入AliMqManager(阿里云 消息队列mq工具类)

        public static boolean sendMqttMsg(String topic, String tag, String txt) throws UnsupportedEncodingException{
            logger.debug("开始发送mq消息{topic:{},tag:{},txt:{}}",topic, tag, txt);
            //循环发送消息
            Message msg =  new Message( //
                    // Message 所属的 Topic
                    topic,
                    // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤
                    "MQ2MQTT",
                    // Message Body 可以是任何二进制形式的数据, MQ 不做任何干预,
                    // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
                    txt.getBytes("UTF-8"));
            
            // 设置代表消息的业务关键属性,请尽可能全局唯一。
            // 以方便您在无法正常收到消息情况下,可通过阿里云服务器管理控制台查询消息并补发
            // 注意:不设置也不会影响消息正常收发
            msg.setKey("msg_"+ topic + "_" + tag +System.currentTimeMillis());
    //        msg.putUserProperties("mqttSecondTopic", topic+"/queue/"+tag);
            
            try {
                SendResult sendResult = producer.send(msg);
                // 同步发送消息,只要不抛异常就是成功
                if (sendResult != null) {
                    logger.info("mq消息发送成功 topic:{} msgId:{}",msg.getTopic(),sendResult.getMessageId());
                }
                return true;
            }catch (Exception e) {
                // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
                logger.error("mq消息发送失败 topic:{}",msg.getTopic());
                logger.error("mq发送消息失败{}",e.getMessage());
            }
            return false;
        }

    这里注意一点:上面标记处的producer是静态定义的对象,且被初始化过

    private static Producer producer;
    
    /**
     * 初始化
     *
     */
    public static void init(){
            try {
                Properties properties = new Properties();
                //您在控制台创建的 Producer ID
                properties.put(PropertyKeyConst.ProducerId, Global.getConfig("mq.producerid"));
                // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
                properties.put(PropertyKeyConst.AccessKey,Global.getConfig("mq.accesskey"));
                // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
                properties.put(PropertyKeyConst.SecretKey, Global.getConfig("mq.secretkey"));
                //设置发送超时时间,单位毫秒
                properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, Global.getConfig("mq.sendmsgtimeoutmillis"));
                // 设置 TCP 接入域名(此处以公共云生产环境为例)
                properties.put(PropertyKeyConst.ONSAddr, Global.getConfig("mq.onsaddr"));
                producer = ONSFactory.createProducer(properties);
                // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
                producer.start();
                logger.info("============阿里云消息队列mq初始化成功================");
            } catch (Exception e) {
                logger.error("阿里云消息队列mq初始化失败,{}",e.getMessage());
            }
            
        }

     配置文件,jeesite.properties

    #========MQ===============================================
    mq.producerid=PID_qbkj_queue_mqtt
    mq.accesskey=******
    mq.secretkey=******
    mq.sendmsgtimeoutmillis=3000
    mq.onsaddr=http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet

    2.如何订阅?

  • 相关阅读:
    Eclipse 代码自动补全设置
    Ubuntu下MySQL的安装及远程连接配置等配置
    Ubuntu 8.04 下安装mcrypt扩展
    Android sdk manager 显示 “Done loading packages”,停下来不动了!
    「Clover 10」杯HE两校联赛(第二轮Day1)
    自招搞到了NUAA降分到一本线的优惠
    NOIP 2009 解题报告
    SD多校模拟赛Day1&Day2
    「Nescafé 29」杯HE两校联赛(第二轮Day2)
    临近比赛要淡定从容
  • 原文地址:https://www.cnblogs.com/banxian-yi/p/10811177.html
Copyright © 2011-2022 走看看