zoukankan      html  css  js  c++  java
  • RocketMQ生产消费模型选择

    一. 生产者,根据某个标识将消息放到同一个队列中

    在发送消息时,使用SelectMessageQueueByHash,该类根据传入进去的arg,进行hash计算,将消息分配到相应的队列中。

    public class Producer {
    
        public static void main(String[] args) throws MQClientException {
    
            DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
            producer.setNamesrvAddr("10.130.41.36:9876");
            producer.setInstanceName("Producer");
            producer.setVipChannelEnabled(false);
            producer.start();
    
            String[] tags = {"tagA","tagB"};
    
            for (int i = 1; i <= 10; i++) {
                try {
                    Message msg = new Message("TopicTest",tags[i%tags.length],"key1"+i,("订单一号" + i).getBytes());
                    SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(),1);
                    System.out.println(sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            for (int i = 1; i <= 10; i++) {
                try {
                    Message msg = new Message("TopicTest",tags[i%tags.length],"key2"+i,("订单二号" + i).getBytes());
                    SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(),2);
                    System.out.println(sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            for (int i = 1; i <= 10; i++) {
                try {
                    Message msg = new Message("TopicTest",tags[i%tags.length],"key3"+i,("订单三号" + i).getBytes());
                    SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(),3);
                    System.out.println(sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            producer.shutdown();
        }
    }
    

      上述代码执行后Topic队列中的内容:

    二. 消费者

    (1). 顺序消费

    使用MessageListenerOrderly,顺序消费同一个队列中的数据,只有第一个数据消费成功了才会消费第二个数据。

    模拟在消费某个队列中的数据时出现了阻塞状态。

    public class ConsumerOrderly {
        public static void main(String[] args) throws InterruptedException,
                MQClientException {
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
            consumer.setNamesrvAddr("10.130.41.36:9876");
            consumer.setInstanceName("Consumer1");
            consumer.setMessageModel(MessageModel.CLUSTERING);
            consumer.subscribe("TopicTest", "*");
    
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    //设置自动提交,如果不设置自动提交就算返回SUCCESS,消费者关闭重启 还是会重复消费的
                    context.setAutoCommit(true);
                    try {
                        for (MessageExt msg:msgs) {
                            String msgKey = msg.getKeys();
                            if(msgKey.equals("key13") || msgKey.equals("key22")){
                                Thread.sleep(1000);
                            }
                            System.out.println(" 消费者1 ==> 当前线程:"+Thread.currentThread().getName()+" ,quenuID: "+msg.getQueueId()+ " ,content: " + new String(msg.getBody()));
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        //如果出现异常,消费失败,挂起消费队列一会会,稍后继续消费
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
    
                    //消费成功
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
    
            /**
             * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
             */
            consumer.start();
    
            System.out.println("C1 Started.");
        }
    }
    

      测试结果如下:

    当"订单一号3"没有消费时 "订单一号4","订单一号5"是不能被消费的,"订单二号2"也是同样的情况。

    (2). 并发消费

    使用MessageListenerConcurrently,并发消费同一个队列中的数据,不能保证消费的顺序。

    模拟在消费某个数据时出现了阻塞状态。

    public class ConsumerConcurrently {
        public static void main(String[] args) throws InterruptedException,
                MQClientException {
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
            consumer.setNamesrvAddr("10.130.41.36:9876");
            consumer.setInstanceName("Consumer1");
            consumer.setMessageModel(MessageModel.CLUSTERING);
    
            consumer.subscribe("TopicTest", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    try {
                        for (MessageExt msg:msgs) {
                            String msgKey = msg.getKeys();
                            if(msgKey.equals("key13") || msgKey.equals("key22")){
                                Thread.sleep(1000);
                            }
                            System.out.println(" 消费者1 ==> 当前线程:"+Thread.currentThread().getName()+" ,quenuID: "+msg.getQueueId()+ " ,content: " + new String(msg.getBody()));
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
    
                    //消费成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("C1 Started.");
        }
    }
    

      测试结果如下

    当消费"订单一号3"阻塞时,会将后面的数据交给其他线程消费,所以"订单一号4" ,"订单一号5"在 "订单一号3"之前消费了。

    (3). 集群消费

    不同消费者设置成相同的组名,在MessageModel.CLUSTERING模式下,不同消费者会消费不同的队列,同一个消费者中保证顺序

    消费者1

    public class ConsumerOrderly_1 {
        public static void main(String[] args) throws InterruptedException,
                MQClientException {
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
            consumer.setNamesrvAddr("10.130.41.36:9876");
            consumer.setInstanceName("Consumer1");
            consumer.setMessageModel(MessageModel.CLUSTERING);
            consumer.subscribe("TopicTest", "*");
    
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    //设置自动提交,如果不设置自动提交就算返回SUCCESS,消费者关闭重启 还是会重复消费的
                    context.setAutoCommit(true);
                    try {
                        for (MessageExt msg:msgs) {
                            String msgKey = msg.getKeys();
                            if(msgKey.equals("key13")){
                                Thread.sleep(1000);
                            }
                            System.out.println(" 消费者1 ==> 当前线程:"+Thread.currentThread().getName()+" ,quenuID: "+msg.getQueueId()+ " ,content: " + new String(msg.getBody()));
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        //如果出现异常,消费失败,挂起消费队列一会会,稍后继续消费
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
    
                    //消费成功
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
    
            /**
             * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
             */
            consumer.start();
    
            System.out.println("C1 Started.");
        }
    }

    消费者2

    public class ConsumerOrderly_2 {
        public static void main(String[] args) throws InterruptedException,
                MQClientException {
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
            consumer.setNamesrvAddr("10.130.41.36:9876");
            consumer.setInstanceName("Consumer2");
            consumer.setMessageModel(MessageModel.CLUSTERING);
            consumer.subscribe("TopicTest", "*");
    
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    //设置自动提交,如果不设置自动提交就算返回SUCCESS,消费者关闭重启 还是会重复消费的
                    context.setAutoCommit(true);
                    try {
                        for (MessageExt msg:msgs) {
                            String msgKey = msg.getKeys();
                            if(msgKey.equals("key22")){
                                Thread.sleep(1000);
                            }
                            System.out.println(" 消费者2 ==> 当前线程:"+Thread.currentThread().getName()+" ,quenuID: "+msg.getQueueId()+ " ,content: " + new String(msg.getBody()));
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        //如果出现异常,消费失败,挂起消费队列一会会,稍后继续消费
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
    
                    //消费成功
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
    
            /**
             * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
             */
            consumer.start();
    
            System.out.println("C2 Started.");
        }
    }

    测试结果如下:

    消费者1负责队列1,并保证队列1中的所有消息是按照顺序消费的

    消费者2负责队列2和队列3,根据"订单二号2"可以看出,他保证了队列2和队列3的顺序消费。

    (4). 消费者A和消费者B同组,消费者A消费tagA,消费者B消费tagB如图

     在这种情况下,因为集群中订阅消息不一致,导致消费出现问题,最后启动的消费者才可以正常消费消息。

    要解决这个问题,需要保证集群中的消费者拥有统一的订阅消息,Topic和Tag要一致才可以。

    参考:
    https://www.jianshu.com/p/524ef06ce25a
    https://mp.weixin.qq.com/s/HbIS0yEJsCPMYwwYDBIvMQ

    (5). 消费者A和消费者B不同组,消费者A消费tagA,消费者B消费tagB

    在消费者1中,能保证tagA1,tagA2顺序的消费,消费者2中能保证tagB1,tagB2顺序的消费。
    但是不能保证tagA1和tagB1的消费顺序。

    测试代码:

    消费者1

    public class ConsumerOrderly_1 {
        public static void main(String[] args) throws InterruptedException,
                MQClientException {
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
            consumer.setNamesrvAddr("10.130.41.36:9876");
            consumer.setInstanceName("Consumer1");
            consumer.setMessageModel(MessageModel.CLUSTERING);
            consumer.subscribe("TopicTest", "tagA");
    
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    //设置自动提交,如果不设置自动提交就算返回SUCCESS,消费者关闭重启 还是会重复消费的
                    context.setAutoCommit(true);
                    try {
                        for (MessageExt msg:msgs) {
                            System.out.println(" 消费者1 ==> 当前线程:"+Thread.currentThread().getName()+" ,quenuID: "+msg.getQueueId()+ " ,content: " + new String(msg.getBody()));
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        //如果出现异常,消费失败,挂起消费队列一会会,稍后继续消费
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
    
                    //消费成功
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
    
            /**
             * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
             */
            consumer.start();
    
            System.out.println("C1 Started.");
        }
    }

    消费者2

    public class ConsumerOrderly_2 {
        public static void main(String[] args) throws InterruptedException,
                MQClientException {
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName1");
            consumer.setNamesrvAddr("10.130.41.36:9876");
            consumer.setInstanceName("Consumer2");
            consumer.setMessageModel(MessageModel.CLUSTERING);
            consumer.subscribe("TopicTest", "tagB");
    
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    //设置自动提交,如果不设置自动提交就算返回SUCCESS,消费者关闭重启 还是会重复消费的
                    context.setAutoCommit(true);
                    try {
                        for (MessageExt msg:msgs) {
                            String msgKey = msg.getKeys();
                            if(msgKey.equals("key11")){
                                Thread.sleep(1000);
                            }
                            System.out.println(" 消费者2 ==> 当前线程:"+Thread.currentThread().getName()+" ,quenuID: "+msg.getQueueId()+ " ,content: " + new String(msg.getBody()));
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        //如果出现异常,消费失败,挂起消费队列一会会,稍后继续消费
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
    
                    //消费成功
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
    
            /**
             * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
             */
            consumer.start();
    
            System.out.println("C2 Started.");
        }
    }

    测试结果:

    消费者1

    消费者2

    "订单一号2" 在 "订单一号1" 前被消费了。



  • 相关阅读:
    元旦晚会
    CF906D Power Tower
    基于51单片机的多功能秒表(校赛作品)
    集训队第二次排位赛
    《史记》——五帝本纪第一,黄帝部分
    原创,让你的 "Ajax"请求华丽转身,给 "body" 或是 "Div" 加上Loading遮罩!
    Excel导出通用操作方式
    图片(img标签)的onerror事件,你有用过嘛?
    @Url.ActionLink 和 @Url.Action
    原创,自己做的一个简单实用的提示小插件,兼容性很好,基本上都兼容!
  • 原文地址:https://www.cnblogs.com/Sicwen/p/10528201.html
Copyright © 2011-2022 走看看