zoukankan      html  css  js  c++  java
  • RabbitMQ中实现延时消息

    平常项目中很多场景需要使用延时消息处理,例如订单超过多久没有支付需要取消等。如何在消息中间件RabbitMQ中实现该功能?下面描述下使用Dead Letter Exchange实现延时消息场景,当然会有别的其他实现方式。

    1. 什么是Dead Letter Exchange?

    RabbitMQ中通常消息被直接发送到队列中或者从Exchange中Route到队列上后,消息如果被消费者消费完毕并确认后消息就会从Broker中被删除。
    如果存在以下三种情况,同时队列上设置了Dead Letter Exchange,消息会被转送到Dead Letter Exchange中。

    • 消息被拒绝(basicReject或者basicNack) requeue=false
    • 消息存活时间超过了TTL预设值(x-message-ttl)
    • 队列满了

    Dead Letter Exchange像平常的Exchange一样,可以设置它的BuiltinExchangeType,也可以为它绑定队列。
    这里我们可以通过设定Dead Letter Exchange,并为它绑定一个队列,然后定义Consumer消费这个队列,就可以达到处理延时消息的功能。

    2. 代码实例

    流程先:

    I. 定义消息生产者

        /***
         * 消息发送者
         */
        static class NormalEXSend {
            private Connection conn;
            private Channel chnl;
    
            public NormalEXSend(String tag) throws IOException, TimeoutException {
                ConnectionFactory connFact = initConnFac();
                conn = connFact.newConnection();
                chnl = conn.createChannel();
    
                // 定义正常工作Exchange
                chnl.exchangeDeclare(WORKER_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
                // 定义 dead letter exchange
                chnl.exchangeDeclare(DELAY_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
                Map<String, Object> args = new HashMap<>();
                args.put("x-message-ttl", 60000); // timeout 1min
                args.put("x-dead-letter-exchange", DELAY_EXCHANGE_NAME);
                args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
    
                // 定义正常工作Queue同时设置dead letter exchange
                chnl.queueDeclare(WORKER_QUEUE_NAME, false, false, false, args);
    
                // 绑定到正常工作Exchange
                chnl.queueBind(WORKER_QUEUE_NAME, WORKER_EXCHANGE_NAME, tag);
            }
    
            /**
             * 发送消息
             * @param key
             * @param msg
             * @throws IOException
             */
            public void send(String key, String msg) throws IOException {
                AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
                // send a message to a exchange
                chnl.basicPublish(WORKER_EXCHANGE_NAME, key, props, msg.getBytes());
                System.out.println(String.format("[%s|%s|Sender] send 【%s】 to exchange:%s", Thread.currentThread().getName(), System.currentTimeMillis(), msg, WORKER_EXCHANGE_NAME));
            }
        }
    

    II. 定义延时消息处理者

    其中receive方法中consumerhandleDelivery方法参数properties可以获取到消息的death原因properties.getHeaders().get("x-first-death-reason"),可能值rejected | expired | maxlen。此处可以根据判断此值去处理由于超时而引起death的消息(就是我们想要处理的延时消息)。

        /**
         * 延时消息处理者
         */
        static class DelayEXRecv {
            private Connection conn;
            private Channel chnl;
    
            public DelayEXRecv() throws IOException, TimeoutException {
                ConnectionFactory connFact = initConnFac();
                conn = connFact.newConnection();
                chnl = conn.createChannel();
                // 定义延时消息队列
                chnl.queueDeclare(DELAY_QUEUE_NAME, false, false, false, null);
    
                // 绑定到延时消息Exchange
                chnl.queueBind(DELAY_QUEUE_NAME, DELAY_EXCHANGE_NAME, DEAD_ROUTING_KEY);
            }
    
            /**
             * 接收消息
             * @throws IOException
             */
            public void receive() throws IOException {
                chnl.basicQos(1);
                // no auto ack
                boolean autoAck = false;
                chnl.basicConsume(DELAY_QUEUE_NAME, autoAck, new DefaultConsumer(chnl) {
                    public void handleDelivery(String consumerTag, Envelope envelope,
                                               AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String message = new String(body, "UTF-8");
                        // 打印出消息的death原因 rejected | expired | maxlen
                        // 项目中可以根据原因处理目标消息
                        System.out.println(String.format("[%s|%s|Delay_Receiver] received the delay msg 【%s】 from EXCHANGE: %s, the delay reason is: %s", Thread.currentThread().getName(), System.currentTimeMillis(), message, envelope.getExchange(), properties.getHeaders().get("x-first-death-reason")));
                        // 确认消息
                        chnl.basicAck(envelope.getDeliveryTag(), false);
                    }
                });
            }
        }
    

    III. 试验一把

        private static final String WORKER_EXCHANGE_NAME = "exchange.worker";
        private static final String DELAY_EXCHANGE_NAME = "exchange.delay";
        private static final String WORKER_QUEUE_NAME = "queue.worker";
        private static final String DELAY_QUEUE_NAME = "queue.delay";
        private static final String DEAD_ROUTING_KEY = "dead.key.message";
    
        public static void main(String[] args) {
            ExecutorService exec = Executors.newFixedThreadPool(2);
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        String key = "worker";
                        NormalEXSend sender = new NormalEXSend(key);
                        for (int i =0; i < 5; i++) {
                            sender.send(key, String.format("YaYYY, one message, No.:%s!", i));
                            Thread.sleep(3000);
                        }
                    } catch (IOException | TimeoutException | InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        DelayEXRecv receiver = new DelayEXRecv();
                        receiver.receive();
                        System.out.println(String.format("[%s|%s|Delay_Receiver] Starting the Delay Msg Receiver process...", Thread.currentThread().getName(), System.currentTimeMillis()));
                    } catch (IOException | TimeoutException e) {
                        e.printStackTrace();
                    }
    
                }
            });
    
            exec.shutdown();
        }
    

    IV. 打印结果

    [pool-1-thread-2|1515750089010|Delay_Receiver] Starting the Delay Msg Receiver process...
    [pool-1-thread-1|1515750089020|Sender] send 【YaYYY, one message, No.:0!】 to exchange:exchange.worker
    [pool-1-thread-1|1515750092020|Sender] send 【YaYYY, one message, No.:1!】 to exchange:exchange.worker
    [pool-1-thread-1|1515750095020|Sender] send 【YaYYY, one message, No.:2!】 to exchange:exchange.worker
    [pool-1-thread-1|1515750098021|Sender] send 【YaYYY, one message, No.:3!】 to exchange:exchange.worker
    [pool-1-thread-1|1515750101022|Sender] send 【YaYYY, one message, No.:4!】 to exchange:exchange.worker
    [pool-2-thread-4|1515750149038|Delay_Receiver] received the delay msg 【YaYYY, one message, No.:0!】 from EXCHANGE: exchange.delay, the delay reason is: expired
    [pool-2-thread-5|1515750152035|Delay_Receiver] received the delay msg 【YaYYY, one message, No.:1!】 from EXCHANGE: exchange.delay, the delay reason is: expired
    [pool-2-thread-6|1515750155035|Delay_Receiver] received the delay msg 【YaYYY, one message, No.:2!】 from EXCHANGE: exchange.delay, the delay reason is: expired
    [pool-2-thread-7|1515750158036|Delay_Receiver] received the delay msg 【YaYYY, one message, No.:3!】 from EXCHANGE: exchange.delay, the delay reason is: expired
    [pool-2-thread-8|1515750161036|Delay_Receiver] received the delay msg 【YaYYY, one message, No.:4!】 from EXCHANGE: exchange.delay, the delay reason is: expired
    

    可以看出消息是在指定延时的1min后才被获取消费。
    Yayy, 至此结束。

    参考:http://www.rabbitmq.com/dlx.html

  • 相关阅读:
    JavaScript对原始数据类型的拆装箱操作
    Javascript继承(原始写法,非es6 class)
    动态作用域与词法作用域
    自行车的保养
    探索JS引擎工作原理 (转)
    C语言提高 (7) 第七天 回调函数 预处理函数DEBUG 动态链接库
    C语言提高 (6) 第六天 文件(续) 链表的操作
    C语言提高 (5) 第五天 结构体,结构体对齐 文件
    C语言提高 (4) 第四天 数组与数组作为参数时的数组指针
    C语言提高 (3) 第三天 二级指针的三种模型 栈上指针数组、栈上二维数组、堆上开辟空间
  • 原文地址:https://www.cnblogs.com/sv00/p/8280082.html
Copyright © 2011-2022 走看看