zoukankan      html  css  js  c++  java
  • RocketMq(四、顺序消息)

    rocketMQ通过将消息发送给指定的队列来实现顺序发送

    Producer

    package com.wk.test.rocketmqTest.shunxu;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.UUID;
    
    public class Producer {
    
        public static void main(String[] args) {
            DefaultMQProducer producer = new DefaultMQProducer("shunxu_producer");
            producer.setNamesrvAddr("10.32.16.179:9876");
            try {
                producer.start();
                List<Order> orders = getOrders();
                for(Order order:orders){
                    String uuid = UUID.randomUUID().toString();
                    Message message = new Message("shunxuTopic", "order",uuid,("顺序消费:订单号_"+order.getOrderId()+" 订单状态_"+order.getOrderStatus()).getBytes());
                    SendResult sendResult = producer.send(message, (list, message1, obj) -> {
                        int value = obj.hashCode();
                        if(value < 0){
                            value = Math.abs(value);
                        }
                        //取obj的hashCode的绝对值,然后对list.size()进行取余,得到目标队列在list的下标
                        value = value % list.size();
                        return list.get(value);
                    },order.getOrderId(),10000);//通过订单ID来获取对应的队列,保证每个订单的顺序是创建-支付-完成
                    System.out.println(sendResult);
                }
            } catch (MQClientException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (RemotingException e) {
                e.printStackTrace();
            } catch (MQBrokerException e) {
                e.printStackTrace();
            }finally {
                producer.shutdown();
            }
        }
    
        public static List<Order> getOrders(){
            List<Order> orders = new ArrayList<>();
            Order order = new Order();
            order.setOrderId("111");
            order.setOrderStatus("创建");
            orders.add(order);
    
            order = new Order();
            order.setOrderId("111");
            order.setOrderStatus("支付");
            orders.add(order);
    
            order = new Order();
            order.setOrderId("222");
            order.setOrderStatus("创建");
            orders.add(order);
    
            order = new Order();
            order.setOrderId("333");
            order.setOrderStatus("创建");
            orders.add(order);
    
            order = new Order();
            order.setOrderId("111");
            order.setOrderStatus("完成");
            orders.add(order);
    
            order = new Order();
            order.setOrderId("333");
            order.setOrderStatus("支付");
            orders.add(order);
    
            order = new Order();
            order.setOrderId("222");
            order.setOrderStatus("支付");
            orders.add(order);
    
            order = new Order();
            order.setOrderId("333");
            order.setOrderStatus("完成");
            orders.add(order);
    
            order = new Order();
            order.setOrderId("222");
            order.setOrderStatus("完成");
            orders.add(order);
    
            return orders;
    
        }
    
    }

    Consumer

    package com.wk.test.rocketmqTest.shunxu;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    
    public class Consumer {
    
        public static void main(String[] args) {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("shunxuConsumer");
            consumer.setNamesrvAddr("10.32.16.179:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            try {
                consumer.subscribe("shunxuTopic","*");
                consumer.registerMessageListener(new MessageListenerOrderly() {
                    @Override
                    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                        consumeOrderlyContext.setAutoCommit(true);
                        for(MessageExt messageExt:list){
                            try {
                                System.out.println(new String(messageExt.getBody(),"UTF-8"));
                            } catch (UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                        }
                        return ConsumeOrderlyStatus.SUCCESS;
                    }
                });
                consumer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
    
        }
    }

    Order

    package com.wk.test.rocketmqTest.shunxu;
    
    public class Order {
        private String orderId;
        private String orderStatus;
    
        public String getOrderId() {
            return orderId;
        }
    
        public void setOrderId(String orderId) {
            this.orderId = orderId;
        }
    
        public String getOrderStatus() {
            return orderStatus;
        }
    
        public void setOrderStatus(String orderStatus) {
            this.orderStatus = orderStatus;
        }
    }
  • 相关阅读:
    Populating Next Right Pointers in Each Node I&&II ——II仍然需要认真看看
    MySQL源码分析以及目录结构
    mysql分表的三种方法
    Hadoop学习
    关系型数据库ACID
    九种基本数据类型和它们的封装类
    java中堆和栈的区别
    软件测试-----Graph Coverage作业
    Lab1--关于安装JUnit的简要描述
    动态导入(import)和静态导入(import)的区别
  • 原文地址:https://www.cnblogs.com/Unlimited-Blade-Works/p/12420072.html
Copyright © 2011-2022 走看看