zoukankan      html  css  js  c++  java
  • RocketMQ之Consumer

    一、Consumer 介绍

    1.1 核心参数

    * consumerGroup:消费者组名  
    * MessageModel:消息模型,定义了消息传递到消费者的方式,默认是 MessageModel.CLUSTERING
        * MessageModel.BROADCASTING:广播
        * MessageModel.CLUSTERING:集群
    * consumeFromWhere: 消费者开始消费的位置,默认值是 ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
    	* ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET:从队列最后的位置开始消费
        * ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET:从队列前面最开始消费
        * ConsumeFromWhere.CONSUME_FROM_TIMESTAMP: 从指定时间开始消费,之前的消息将会被忽略    
    * consumeTimestamp:
    * allocateMessageQueueStrategy:消息分配策略
    * subscription:订阅关系
    * offsetStore:存储消息偏移量
    * consumeThreadMin:线程池最小值,默认值是20
    * consumeThreadMax:线程池最大值,默认值是20
    * consumeConcurrentlyMaxSpan:单个队列并行消费最大的跨度,默认2000
    * pullThresholdForQueue:一个队列最大的消费个数,默认1000
    * pullInterval:消息拉取的时间间隔
    * pullBatchSize:消息拉取的个数,默认32啊
    * consumeMessageBatchMaxSize:批量消费量,默认1  
    * messageListener:消息监听器,用来处理消息,它有两个实现类
        * MessageListenerOrderly:按顺序一个个消费    
        * MessageListenerConcurrently:并行消费
    

    二、消费模式

    2.1 集群模式

    * 同一个 consumerGroup 里,并且订阅的 tag 也必须是一样的,这样的 consumer 实例才能组成 consumer 集群;
    * 当 consumer 使用集群消费时,每条消息只会被 consumer 集群内的任意一个 consumer 实例消费一次;
    * 默认的消费模式就是集群模式;
    * 集群模式天然实现负载均衡机制
    

    2.2 广播模式

    * 同一个 consumerGroup 里的 Consumer 会消费订阅 Topic 的全部消息
    * 通过 consumer.setMessageModel(MessageModel.BROADCASTING) 方法设置    
    

    三、Offset 介绍

    3.1 Offset 是什么

    * 在 RocketMQ 中,相同类型的消息会放到一个 Topic 里,为了可以并行操作,一个 Topic 会有多个 MessageQueue。  
    * Offset 是指某个 Topic 下的一条消息在某个 MessageQueue 里的位置;
    * 通过 Offset 的值可以定位到这条消息
    

    3.2 Offset 类结构

    从类结构可以看出 Offset 分为本地文件类型和远程文件类型。

    3.2 消费模式采用的 Offset 类型

    * 集群模式下因为每个 Consumer 消费所订阅主题的一部分,所以采用远程文件存储 Offset;
    * 广播模式下,由于每个 Consumer 需要消费所有的消息,所以采用本地文件存储 Offset。
    

    3.3 Offset 文件存储格式

    OffseStore 使用 Json 格式存储,例如:

    {
        "OffsetTable":{
            1:{
                "brokeName":"localhost",
                "QueueId":1,
                "Topic":"broker1"
            },
           2:{
                "brokeName":"localhost",
                "QueueId":2,
                "Topic":"broker2"
            }
        }
    }
    

    四、不同类型的消费者

    根据对读取操作的控制情况,可以消费者分为两种类型。一个是 DefaultMQPushConsumer,由系统控制读取操作,收到消息后自动调用传入的处理方法来处理;另一个是 DefaultMQPullConsumer ,读取操作中的大部分功能由使用者自主控制。

    4.1 DefaultMQPushConsumer

    DefaultMQPushConsumer 只需要设置好各种参数和设置传入处理消息的回调函数即可,系统收到消息后会自动调用处理函数来处理消息,而且加入新的 DefaultMQPushConsumer 后会自动做负载均衡。

    4.1.1 实例

    public class Consumer {
    
        public static void main(String[] args) throws MQClientException {
            // 创建消费者对象
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupName");
            // 设置服务器地址
            consumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
            // 订阅指定主题
            consumer.subscribe("topicTest","*");
            // 注册消息监听事件
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    System.out.println("msg:"+msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            // 启动消费者
            consumer.start();
        }
    
    }
    

    4.2 DefaultMQPullConsumer

    4.2.1 消费步骤

    1) 读取 topic 的消息队列 message queue 的信息;
    2) 按队列去拉取一定数目的消息;
    3) 持久化message queue的消费进度 offset;
    4) 根据不同的消息状态做不同的处理
    

    4.2.2 拉取结果状态

    public enum PullStatus {
    	// 拉取成功
        FOUND,
    	// 没有消息可以拉取
        NO_NEW_MSG,
    	// 过滤结果不匹配
        NO_MATCHED_MSG,
    	// 偏移量非法,太大或太小
        OFFSET_ILLEGAL
    }
    

    4.2.3 实例

    public class PullConsumer {
    	// 本地 offset 存储容器,生产环境可以放到数据库或 Redis 中
        private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
    
        public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
            DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("DefaultMQPullConsumer");
            // 设置服务器地址
            pullConsumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
            // 启动消费者
            pullConsumer.start();
            // 从指定 topic 获取所有的队列
            Set<MessageQueue> messageQueues = pullConsumer.fetchSubscribeMessageQueues("topicTest");
            // 遍历队列,拉取消息
            for (MessageQueue mq : messageQueues) {
                System.out.printf("从队列中消费: %s%n", mq);
                SINGLE_MQ:
                while (true) {
                    try {
                        // 获取 offset
                        Long offset = getMessageQueueOffset(mq);
                        // 拉取32个消息
                        PullResult pullResult =
                                pullConsumer.pullBlockIfNotFound(mq, null, offset, 32);
                        System.out.printf("%s%n", pullResult);
                        // 保存 offset
                        putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                        switch (pullResult.getPullStatus()) {
                            case FOUND:
                                break;
                            case NO_MATCHED_MSG:
                                break;
                            case NO_NEW_MSG:
                                break SINGLE_MQ;
                            case OFFSET_ILLEGAL:
                                break;
                            default:
                                break;
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
    
            pullConsumer.shutdown();
        }
    
        // 保存上次消费的消息下标
        private static void putMessageQueueOffset(MessageQueue mq,
                                                  long nextBeginOffset) {
            OFFSE_TABLE.put(mq, nextBeginOffset);
        }
    
        // 获取上次消费的消息的下标
        private static Long getMessageQueueOffset(MessageQueue mq) {
            Long offset = OFFSE_TABLE.get(mq);
            if (offset != null) {
                return offset;
            }
            return 0l;
        }
    }
    

    4.3 DefaultLitePullConsumer

    **DefaultMQPullConsumer ** 已经被标识为废弃,替代的是 DefaultLitePullConsumer,下面我们就直接使用 DefaultLitePullConsumer 来操作。

    public class LitePullConsumer {
    
        public static volatile boolean running = true;
    
        public static void main(String[] args) throws Exception {
            DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("LitePullConsumer");
            // 设置服务器地址
            litePullConsumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
            // 关闭自动提交偏移量
            litePullConsumer.setAutoCommit(false);
            // 启动消费者
            litePullConsumer.start();
            // 获取队列
            Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("topicTest");
            List<MessageQueue> list = new ArrayList<>(mqSet);
            List<MessageQueue> assignList = new ArrayList<>();
            for (int i = 0; i < list.size() / 2; i++) {
                assignList.add(list.get(i));
            }
            litePullConsumer.assign(assignList);
            litePullConsumer.seek(assignList.get(0), 10);
            try {
                while (running) {
                    List<MessageExt> messageExts = litePullConsumer.poll();
                    System.out.printf("%s %n", messageExts);
                    litePullConsumer.commitSync();
                }
            } finally {
                litePullConsumer.shutdown();
            }
    
        }
    }
    
  • 相关阅读:
    js图片滑动展示
    那些好像失败了却很有趣的奇怪产物——傅里叶变换图片篇
    啊,满足了我对javaBean的所有幻想,记录一个神器:Lombok!
    十几行代码将mock生成的json数据转为sql的insert语句
    python之三目运算符的替代品?
    【python爬虫】每天统计一遍up主粉丝数!
    大项目之网上书城(十二)——完成啦
    大项目之网上书城(十一)——前台完成
    大项目之网上书城(十)——自动登录
    vs2019 创建vue项目
  • 原文地址:https://www.cnblogs.com/markLogZhu/p/12545597.html
Copyright © 2011-2022 走看看