zoukankan      html  css  js  c++  java
  • RocketMQ(三)——————javaAPI (6.顺序消费)

    分区有序:

    当发送和消费参与的queue只有一个,则是全局有序

    全局有序:

    如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

    概念:
    队列先天支持FIFO模型,单一生产和消费者下只要保证使用`MessageListenerOrderly`监听器即可

    顺序消费表示消息消费的顺序同生产者为每个消息队列发送的顺序一致,所以如果正在处理全局顺序是强制性的场景,

    需要确保使用的主题只有一个消息队列。

    并行消费不再保证消息顺序,消费的最大并行数量受每个消费者客户端指定的线程池限制。

    那么只要顺序的发送,再保证一个线程只去消费一个队列上的消息,那么他就是有序的。


    跟普通消息相比,顺序消息的使用需要在producer的send()方法中添加MessageQueueSelector接口的实现类,
    并重写select选择使用的队列,因为顺序消息局部顺序,需要将所有消息指定发送到同一队列中。

     

    保证有序参与因素:

    - FIFO
    - 队列内保证有序
    - 消费线程

    1、生产者样例

    //发送
        public static void main(String[] args) throws Exception {
    
            DefaultMQProducer producer = new DefaultMQProducer("rocketMq1");
    
            producer.setNamesrvAddr("127.0.0.1:9876");
    
            producer.start();
    
            //20条消息顺序写入
            for (int i = 0; i < 20; i++) {
    
                Message message = new Message("myTopic001", ("顺序--消息!" + i).getBytes());
    
                producer.send(
                        //要发的消息
                        message,
                        //queue选择器,向topic中的哪个 queue 里写入
                        new MessageQueueSelector() {
                            //手动选择一个 queue
                            public MessageQueue select(
                                    //当前topic里包含的所有 queue
                                    List<MessageQueue> list,
                                    //要发的消息
                                    Message message,
                                    //对应到 send() 方法的 arg标识
                                    Object o) {
                                //向固定一个 queue 里写消息(0是第一个队列,默认每个Topic下有4个)
                                MessageQueue queue = list.get(0);
                                //返回 选好的 queue
                                return queue;
                            }
    
                        }, 0, 2000);
            }
    
    //        producer.shutdown();
            System.out.println("生产者下线!");
    
        }

    2、消费者样例

    //接收
        public static void main(String[] args) throws Exception {
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocketMq1");
    
            consumer.setNamesrvAddr("127.0.0.1:9876");
    
            consumer.subscribe("myTopic001","*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    
                    for (MessageExt mes: list) {
    
                        System.out.println("mes : "+new String(mes.getBody())+ " Thread:"  + Thread.currentThread().getName() + " queueid:" + mes.getQueueId());
                    }
    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            //使用一个线程去消费,保证顺序
            consumer.setConsumeThreadMax(1);
            consumer.setConsumeThreadMin(1);
    
            consumer.start();
            System.out.println("Consumer -SX- start...");
        }
  • 相关阅读:
    关于使用gitlab协同开发提交代码步骤
    一些JavaScript中原理的简单实现
    关于JavaScript中bind、applay、call的区别
    在腾讯云centos7.2上安装配置Node.js记录
    JAVA Web期末项目第三阶段成果
    在腾讯云服务器上安装JDK+Tomcat并启动tomcat
    《JavaScript算法》二分查找的思路与代码实现
    将本地的一个项目托管到自己的GitHub仓库
    《JavaScript算法》常见排序算法思路与代码实现
    项目经理建议:管理时间等于管理自己
  • 原文地址:https://www.cnblogs.com/lifan12589/p/14597967.html
Copyright © 2011-2022 走看看