zoukankan      html  css  js  c++  java
  • 读书笔记-RocketMQ实战与原理解析

    rocketmq的四个角色

    producer,comsumer,broker,nameserver

    rocketmq各个角色之间的关系


    rocketmq使用前需要先新建topic,然后根据topic发送和接收消息

    rocketmq集群方法



    rocketmq使用demo

    https://gitee.com/liran123/rocketmq_demo

    同步,异步发送消息
    public SendResult syncSend(Integer id) {
    
            // 创建 Demo01Message 消息
            Demo01Message message = new Demo01Message();
            message.setId(id);
            // 同步发送消息
            return rocketMQTemplate.syncSend(Demo01Message.TOPIC, message);
        }
    
        public void asyncSend(Integer id, SendCallback callback) {
            // 创建 Demo01Message 消息
            Demo01Message message = new Demo01Message();
            message.setId(id);
            // 异步发送消息
            rocketMQTemplate.asyncSend(Demo01Message.TOPIC, message, callback);
        }
    
        public void onewaySend(Integer id) {
            // 创建 Demo01Message 消息
            Demo01Message message = new Demo01Message();
            message.setId(id);
            // oneway 发送消息
            rocketMQTemplate.sendOneWay(Demo01Message.TOPIC, message);
        
    }
    
    接收消息
    @Component
    
    @RocketMQMessageListener(
            topic = Demo03Message.TOPIC,
            consumerGroup = "demo03-consumer-group-" + Demo03Message.TOPIC
    )
    public class Demo03Consumer implements RocketMQListener<Demo03Message> {
    
        private Logger logger = LoggerFactory.getLogger(getClass());
    
        @Override
        public void onMessage(Demo03Message message) {
            logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
        }
    
    
    }
    

    不同类型的消费者








    DefaultMQPushConsumer处理流程

    consumer启动和关闭流程

    当consumer为pull模式,启动可以自己控制,关闭时需要保存offset,需要在异常处理阶段增加把offset写入磁盘的处理,记住了每次关闭的offset,才能保证消息准确性
    push模式,

    关闭时需要调用shutdown函数,释放资源,保存offset

    生产者

    默认使用DefaultMqProducer类

    简单生产者
    public static void main(String[] args) throws MQClientException, InterruptedException {
    
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.setNamesrvAddr("192.168.20.4:9876");
            producer.start();
            for (int i = 0; i < 10000; i++) {
                try {
    
                    Message msg = new Message("TopicTest" /* Topic */,
                        "TagA" /* Tag */,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                    );
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
            producer.shutdown();
        }
    
    简单消费者
     public static void main(String[] args) throws InterruptedException, MQClientException {
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
            consumer.setNamesrvAddr("192.168.20.4:9876");
    
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.subscribe("TopicTest77777", "*");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.start();
    
            System.out.printf("Consumer Started.%n");
        }
    

    nameserver

    • nameserver是整个消息队列的状态服务器,各个角色要定期向nameserver上报自己的状态信息
    • nameserver与各个组件交流的流程

    topic

    通过命令行操作进行创建

    创建topic的命令会被发送到对应的broker,然后执行具体的逻辑

    消息队列核心机制

    服务器要将文件内容从磁盘发送到客户端,需要经历两个步骤

    tmp_buf是预先申请的内存
    从磁盘复制数据到内核态内存,从内核态内存复制到用户内存 完成read操作
    从用户态内存复制到网络驱动的内核态内存,最后从网络驱动的内核台内存复制到网卡中进行传输 完成write操作

    通过mmap方式,可以省去向用户态内存复制,提高速度
    在java7中的mappedbytebuffer实现


    顺序消息

    分为全局顺序消息和部分顺序消息
    rocket默认情况下会新建8个读队列,8个写队列,消息顺序不保证一致

    全局有序

    局部有序


    发送端通过MessageQueueSelector类来控制把消息发往哪个MessageQueue



    消费者端通过MessageListenerOrderly解决/messageQueue消息被并发处理的问题

    在MessageListenerOrderly实现中,通过为每个Queue加锁,消费每个消息前都要先获取锁, 保证同一时间,同一个Queue不会被重复消费,但不同的queue可以并发处理

    消息重复问题

    ![](https://img2020.cnblogs.com/blog/924254/202102/924254-20210219165648328-396085952.png

    消息优先级的问题




    broker端进行消息过滤:减少流到consumer的消息

    通过tag,key,sql表达式等方式过滤
    一个应用最好使用一个topic,然后不同类型的消息子类型用Tag来标识,服务端基于tag进行过滤,并不需要读取消息体的内容,效率比较高
    key一般用消息在业务层面的唯一标识码来表示,尽量使key唯一

  • 相关阅读:
    通过理解List和IList的区别,加深对接口回调的理解
    mysql学习笔记之mysqlparameter(摘)
    MSSQL表中字段更新后,视图中的字段不更新的解决办法
    如何设置firefox,使其可以支持剪贴板
    CSS图片下载器
    VS2008下.NET 单元测试工具 NUnit2.5 配置与集成方法
    discuz x1.5通过uchome注册后免激活补丁(自动激活)
    (转)七秘诀工作效率与薪水翻番
    TRIGGER OF ORACLE
    SQL LOADER 的使用
  • 原文地址:https://www.cnblogs.com/Baronboy/p/14410649.html
Copyright © 2011-2022 走看看