zoukankan      html  css  js  c++  java
  • RocketMQ 使用情况梳理

    个人梳理有限:欢迎大家 丰富此文档

    2018年 12 月 RocketMQ 版本  不适用于 新版关系请勿参考
    目前规划原则:

             topic 创建基于业务  消费者基于模块 多对多关系 且消费自己的topic 不会影响别人  topic n↔n  CID

    基于业务topic 的分布表格: (后续有模块更新请自行更新文档或者联系我补上)

     

    MQ: 使用原则和规范:

          正确的顺序: 是先启动Consumer 后再启动producer。

    1.   所有业务目前使用同一个生产者 PID
    2.   所有topic 由主账号创建 并授权给子账号(dev/prod)
    3.   topic 的创建基于业务(首次登陆,支付成功,行程结束等等)  
    4.   CID(消费者 ID)的创建基于应用 (每个应用如果需要创建一个CID 若需要(广播和集群)两种消费模式则创建两个CID  广播方式后缀加_BROADCAST区分)
    5.   Consumer ID 和 Topic 的关系是 N:N。 同一个 Consumer ID 可以订阅多个 Topic,同一个 Topic 也可以对应多个 Consumer ID。
    6.   消息订阅一致性( 同一 CustomerID 的所有使用的模块 订阅的 topic tag 数量需要完全一致 )
    7.   CID只消费自己授权订阅的 topic. 

    MQ 使用情况总结: 

    1. 主账号创建的CustomerID  以主账号(或有最高权限的授权用户)的Access/Secret 的身份的登录  启动实例   消费者在线 且可以接收消息 并且可以突破订阅限制 订阅谁可以消费谁  (前提是订阅关系一致性 同一 CustomerID 的所有使用的模块 订阅的 topic tag 数量需要完全一致 )

    2. 主账号创建的CustomerID 以子账号dev(普通权限用户)Access/Secret 的身份的登录  启动实例  会出现 topic 消费者 不在线状态 (同当日线上状态)  6月之前建立的topic 和 CID 由于阿里云有补偿机制 仍旧可以运行. (这也是导致上线失败的原因: 当时线下用的 dev 具有最高权限 ,线上 prod 是普通用户权限)

    3. 子账号dev 登录阿里云,创建不同CustomerID 后 ,以子账号(普通权限用户) devAccess/Secret 的身份的登录  启动实例    消费者在线  且可以接收消息 并且可以突破订阅限制 订阅谁可以消费谁 但仅限于消费(子账号)  被授权的 topic.  
      未授权CID 为topic的消费者时 因为子账号有订阅消费权限 所以 子账号创建的 CID 可以订阅和消费 topic 但是不影响其他模块(其他 CID)消费

     

     

    可行方案一

        每个模块实例都使用同一个子账号(Access/Secret相同)  不同模块使用使用同一个 CID时, 需要做到 消息订阅一致性( 同一 CustomerID 的所有使用的模块 订阅的 topic tag 数量需要完全一致 )

     

    可行方案二: (个人推荐方案)

        每个模块实例都使用同一个子账号(Access/Secret相同)  每个模块单独分配自己的CID(同一子账号dev 统一创建), 模块之间数据隔离, 要求各个模块自能用自己的 CID 且不要订阅自己模块不该订阅的 topic 和 tag    (缺点:同一个子账号dev [Access/Secret相同] 订阅谁可以消费谁(但是不响应其他模块) 只要子账号被授权的 topic  每个CID 都可以订阅该topic)

         1.主账号登录并创建topic

         2.授权订阅权限给子账号(账号不能访问未授权的 topic)

         3.子账号登录 topic管理中创建自己账号下的CID

         4.程序中使用 同一子账号(Access/Secret) 但是自己模块的 CID 消费消息 相互不影响

     

    最强隔离方案:

      每个模块实例都使用不同子账号(Access/Secret不同)  每个模块单独分配自己子账号创建的CID.这样模块之间可以保障不能相互订阅和消费.

         1.主账号登录并创建topic

         2.授权订阅权限给子账号(账号不能访问未授权的 topic)

         3.子账号登录 topic管理中创建自己账号下的CID

         4.程序中使用 不同子账号(Access/Secret不同)下自己模块的 CID 消费消息 相互不影响

     

     

    topic n↔n  CID 图解:

     

    https://pic2.zhimg.com/80/v2-b6ed65f370a766620718ad4227d5d4e5_hd.jpg

     奉上  官方文档:  https://help.aliyun.com/document_detail/34411.html?spm=a2c4g.11186623.4.5.565f7b25vcsskW

              官方DEMO:  https://github.com/AliwareMQ/mq-demo?spm=a2c4g.11186623.2.14.578018aaaNZL17

              RocketMQ源码分析辅助: https://www.processon.com/view/5a6eb653e4b05680c3e94fec

     

    个人梳理有限:欢迎大家 丰富此文档

     

     

     

    测试用例:

    import com.alibaba.fastjson.JSON;
    import com.aliyun.openservices.ons.api.*;
    import com.xxx.engine.ui.controller.pay.base.BaseTest;
    import lombok.extern.slf4j.Slf4j;
    import org.junit.Test;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.test.context.TestPropertySource;
    import org.springframework.test.context.web.WebAppConfiguration;

    import java.util.Properties;


    /**
    *
    * topic:
    *
    * topic_dev_xxx_trip_tomas
    * topic_dev_xxx_pay_tomas
    *
    * CID:
    *
    * CID_DEV_xxx_TRIP_TOMAS 作为 topic: trip_tomas 和pay_tomas 消费者
    * CID_DEV_xxx_COORD_TOMAS 作为 topic: trip_tomas 和pay_tomas 消费者
    * CID_DEV_xxx_TAKING_TOMAS 作为 topic: trip_tomas 消费者
    * CID_DEV_xxx_ORDER_TOMAS 作为 topic: pay_tomas 消费者
    * CID_DEV_xxx_COUPON_TOMAS 只建立不需要授权 topic
    */

    @Slf4j
    @SpringBootTest(classes={PayMQTest.class})
    @ComponentScan(basePackages = {"com.xxx.engine"})
    @TestPropertySource("classpath:engine_common.properties")
    @WebAppConfiguration
    public class PayMQTest extends BaseTest {


    @Value("${xxx.mq.tag.pay.notice:pay_notice}")
    private String MQ_TAG_PAY;

    private void sendTestMQ(String topic, String tag) {
    Properties properties = new Properties();
    // 您在MQ控制台创建的Producer ID
    properties.put(PropertyKeyConst.ProducerId, "PID_xxx_DEV");
    // 鉴权用AccessKey,在阿里云服务器管理控制台创建
    properties.put(PropertyKeyConst.AccessKey,"xxxx");
    // 鉴权用SecretKey,在阿里云服务器管理控制台创建
    properties.put(PropertyKeyConst.SecretKey, "xxxxxx");
    // 设置 TCP 接入域名(此处以公共云的公网接入为例)
    properties.put(PropertyKeyConst.ONSAddr,"http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
    Producer producer = ONSFactory.createProducer(properties);
    // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可
    producer.start();
    //循环发送消息
    for (int i=10000;i<10005;i++){
    try {
    Thread.sleep(100L);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    String message = topic.concat(tag).concat("消息内容:").concat(String.valueOf(i));
    Message msg = new Message( //
    // 在控制台创建的Topic,即该消息所属的Topic名称
    topic,
    // Message Tag,
    // 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
    tag,
    // Message Body
    // 任何二进制形式的数据, MQ不做任何干预,
    // 需要Producer与Consumer协商好一致的序列化和反序列化方式
    message.getBytes());
    // 设置代表消息的业务关键属性,请尽可能全局唯一,以方便您在无法正常收到消息情况下,可通过MQ控制台查询消息并补发
    // 发送消息,只要不抛异常就是成功
    // 打印Message ID,以便用于消息发送状态查询
    SendResult sendResult = producer.send(msg);
    System.out.println("Send Message success. Message ID is: " + sendResult.getMessageId());
    }
    // 在应用退出前,可以销毁Producer对象
    producer.shutdown();
    }

    /**
    * 持续发送1w 条 MQ 消息
    * @throws Exception
    */
    @Test
    public void sendMQ() throws Exception {
    sendTestMQ("topic_dev_xxx_trip_tomas", MQ_TAG_PAY);
    sendTestMQ("topic_dev_xxx_pay_tomas", MQ_TAG_PAY);
    }
    /**
    * 子账号dev 子账号(普通权限用户) dev Access/Secret 的身份的登录
    * CID 由子账号创建 并订阅topic (本例:topic_dev_xxx_trip_tomas和topic_dev_xxx_pay_tomas)
    * 启动实例 消费者在线 且可以接收消息
    * @throws Exception
    */
    @Test
    public void WithAuthCID() throws Exception {
    Properties consumerProperties = new Properties();
    consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TRIP_TOMAS");
    consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
    consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
    consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
    //consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
    Consumer consumer = ONSFactory.createConsumer(consumerProperties);
    consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
    consumer.subscribe("topic_dev_xxx_pay_tomas", MQ_TAG_PAY, getMessageListener());
    consumer.start();
    System.out.println("reciveMQByDevWithAuthCID Started.");
    Thread.sleep(1000000000000l);
    }
    /**
    * 子账号dev 子账号(普通权限用户) dev Access/Secret 的身份的登录
    * CID 由子账号创建 并订阅topic (本例:topic_dev_xxx_trip_tomas和topic_dev_xxx_pay_tomas)
    * 启动实例 消费者在线 且可以接收消息
    * PS. CID_DEV_xxx_TRIP_TOMAS 同一个 CID 可以启动多个实例 但是必须保证 每个实例订阅的 topic 和 tag 一致 不然会违反消息一致性原则 导致消息消费混乱
    * @throws Exception
    */
    @Test
    public void WithAuthCIDSecond() throws Exception {
    Properties consumerProperties = new Properties();
    consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TRIP_TOMAS");
    consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
    consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
    consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
    //consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
    Consumer consumer = ONSFactory.createConsumer(consumerProperties);
    //正确方式(和上个实例一样) 消费成功
    consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
    consumer.subscribe("topic_dev_xxx_pay_tomas", MQ_TAG_PAY, getMessageListener());

    //错误方式(和上个实例不一样) 消费失败 违反订阅关系一致性
    consumer.subscribe("topic_dev_xxx_pay_tomas", MQ_TAG_PAY, getMessageListener());

    consumer.start();
    System.out.println("WithAuthCIDSecond Started.");
    Thread.sleep(1000000000000l);
    }

    /**
    * 子账号dev 子账号(普通权限用户) dev Access/Secret 的身份的登录
    * CID 由子账号创建 并订阅topic (本例:topic_dev_xxx_trip_tomas)
    * 启动实例 消费者在线 且可以接收消息
    * PS. CID_DEV_xxx_TAKING_TOMAS 作为topic_dev_xxx_trip_tomas的消费者 不影响其他模块CID 订阅和消费任何topic
    * @throws Exception
    */
    @Test
    public void WithAuthCID3() throws Exception {
    Properties consumerProperties = new Properties();
    consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TAKING_TOMAS");
    consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
    consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
    consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
    //consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
    Consumer consumer = ONSFactory.createConsumer(consumerProperties);
    consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
    consumer.start();
    System.out.println("reciveMQByDevWithOutAuthCID3 Started.");
    Thread.sleep(1000000000000l);
    }

    /**
    * 子账号dev 以子账号(普通权限用户) devAccess/Secret 的身份的登录
    * 启动实例 消费者在线 且可以接收消息 并且可以突破订阅限制 订阅谁可以消费谁 但仅限于消费(子账号) 被授权的 topic
    *
    * 子账号CID不是 topic指定的消费者 强制作为为topic的消费者时 因为子账号有订阅消费权限 所以 子账号创建的 CID 可以订阅和消费 topic 但是不影响其他模块(其他 CID)
    * @throws Exception
    */
    @Test
    public void WithOutAuthCID() throws Exception {
    Properties consumerProperties = new Properties();
    consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_COUPON_TOMAS");
    consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
    consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
    consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
    //consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
    Consumer consumer = ONSFactory.createConsumer(consumerProperties);
    consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
    consumer.start();
    System.out.println("reciveMQByDevWithOutAuthCID Started.");
    Thread.sleep(1000000000000l);
    }

    /**
    * 子账号CID不是 topic指定的消费者 强制作为为topic的消费者时 因为子账号有订阅消费权限 所以 子账号创建的 CID 可以订阅和消费 topic 但是不影响其他模块(其他 CID)
    * @throws Exception
    */
    @Test
    public void WithOutAuthCID2() throws Exception {
    Properties consumerProperties = new Properties();
    consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_ORDER_TOMAS");
    consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
    consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
    consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
    consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
    Consumer consumer = ONSFactory.createConsumer(consumerProperties);
    consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
    consumer.start();
    System.out.println("reciveMQByDevWithOutAuthCID2 Started.");
    Thread.sleep(1000000000000l);
    }


    /**
    * 强隔离方案:
    * 每个模块实例都使用不同子账号(Access/Secret不同) 每个模块单独分配自己子账号创建的CID.这样模块之间可以保障不能相互订阅和消费.
    * 1.主账号登录并创建topic
    * 2.授权订阅权限给子账号(账号不能访问未授权的 topic)
    * 3.子账号登录 topic管理中创建自己账号下的CID
    * 4.程序中使用 不同子账号(Access/Secret不同)下自己模块的 CID 消费消息 相互不影响
    *
    * @throws Exception
    */
    @Test
    public void recivePayMQByCoodAccountWithAuthCID() throws Exception {
    Properties consumerProperties = new Properties();
    consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TEST");
    consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
    consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
    consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
    //consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
    Consumer consumer = ONSFactory.createConsumer(consumerProperties);
    consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
    consumer.start();
    System.out.println("recivePayMQByCoodAccountWithAuthCID Started.");
    //Thread.sleep(1000000000000l);
    }

    private MessageListener getMessageListener(){
    return new MessageListener() {
    @Override
    public Action consume(Message message, ConsumeContext consumeContext) {
    try {
    log.info("receive message ={} ConsumeContext={}:", JSON.toJSONString(message), JSON.toJSONString(consumeContext));
    } catch (Exception e) {
    log.error("{}", e);
    return Action.ReconsumeLater;
    }
    return Action.CommitMessage;
    }
    };
    }
     
  • 相关阅读:
    设计模式15:Interpreter 解释器模式(行为型模式)
    设计模式14:Command 命令模式(行为型模式)
    设计模式13:Template Method 模板方法模式(行为型模式)
    设计模式12: Proxy 代理模式(结构型模式)
    敏捷软件开发:原则、模式与实践——第20章 咖啡的启示
    敏捷软件开发:原则、模式与实践——第19章 类图
    敏捷软件开发:原则、模式与实践——第16章 对象图、第17章 用例、第18章 顺序图
    敏捷软件开发:原则、模式与实践——第15章 状态图
    敏捷软件开发:原则、模式与实践——第14章 使用UML
    敏捷软件开发:原则、模式与实践——第13章 写给C#程序员的UML概述
  • 原文地址:https://www.cnblogs.com/xmanblue/p/10881524.html
Copyright © 2011-2022 走看看