zoukankan      html  css  js  c++  java
  • RabbitMQ 延时消息队列

    消息延时在日常随处可见:

    1、订单创建10min之后不发起支付,自动取消。

    2、30min定时推送一次邮件信息。

    最常用到方式后台定时任务轮训,量小的时候可以使用,量大会出现数据读取会性能问题。RabbitMQ并没有直接实现延时队列,但是可以利用RabbitMQ两个属性实现延时队列特性:

    1、x-message-ttl:消息过期时间(Time To Live,TTL),超过过期时间之后即变为死信(Dead-letter),不会再被消费者消费。

    设置TTL有两种方式:

      (1)创建队列时指定x-message-ttl,此时整个队列具有统一过期时间;

      (2)发送消息为每个消息设置expiration,此时消息之间过期时间不同。 

    注意:如果两者都设置,过期时间取两者最小。 

    2、x-dead-letter-exchange(RabbitMQ文档) :过期消息路由转发(转发器类型),当消息达到过期时间由该exchange按照配置的x-dead-letter-routing-key转发到指定队列,最后被消费者消费。

    spring整合RabbitMQ : 

    RabbitMQ延时队列逻辑:

      

    1、exchange_delay_begin:缓冲队列exchange交换器,用于将消息转发至缓存消息队列 queue_delay_begin 。

    2、exchange_delay_done:死信(dead-letter)队列exchange交换器,用于将队列 queue_delay_begin 转发到死信队列。

    3、queue_delay_begin:缓冲消息队列,等待消息过期。

    4、queue_delay_done:死信消息队列,消费者能够真正消费信息。

     spring-rabbitmq.xml :

    复制代码
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
           xmlns:util="http://www.springframework.org/schema/util" xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
                                http://www.springframework.org/schema/context
                                http://www.springframework.org/schema/context/spring-context-3.1.xsd
                                http://www.springframework.org/schema/tx
                                http://www.springframework.org/schema/tx/spring-tx.xsd
                                http://www.springframework.org/schema/aop
                                http://www.springframework.org/schema/aop/spring-aop.xsd
                                http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd
                                http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">
    
        <!--配置connection-factory,指定连接rabbit server参数 -->
        <rabbit:connection-factory id="connectionFactory"  username="guest" password="guest" host="127.0.0.1" port="5672" publisher-confirms="true"/>
    
        <!-- 延时队列 -->
        <rabbit:direct-exchange id="exchange_delay_begin" name="exchange_delay_begin"  durable="false" auto-delete="false" >
            <rabbit:bindings>
                <rabbit:binding queue="queue_delay_begin" key="delay" />
            </rabbit:bindings>
        </rabbit:direct-exchange>
    
        <rabbit:queue name="queue_delay_begin" durable="false">
            <rabbit:queue-arguments>
                <!--  队列过期时间 -->
                <entry key="x-message-ttl" value="30000" value-type="java.lang.Long" />
                <entry key="x-dead-letter-exchange" value="exchange_delay_done" />
                <entry key="x-dead-letter-routing-key" value="delay" />
            </rabbit:queue-arguments>
        </rabbit:queue>
    
        <rabbit:direct-exchange id="exchange_delay_done" name="exchange_delay_done"  durable="false" auto-delete="false" >
            <rabbit:bindings>
                <rabbit:binding queue="queue_delay_done" key="delay" />
                <!--  binding key 相同为 【delay】exchange转发消息到多个队列 -->
                <!--<rabbit:binding queue="queue_delay_done_two" key="delay" />-->
            </rabbit:bindings>
        </rabbit:direct-exchange>
    
        <rabbit:queue name="queue_delay_done" durable="false"/>
    
        <rabbit:template id="delayMsgTwoTemplate" connection-factory="connectionFactory" />
    
        <bean id="messageConsumer" class="com.nancy.rabbitmq.demo.MessageConsumer"></bean>
    
        <!-- 消息接收者 -->
        <rabbit:listener-container connection-factory="connectionFactory" channel-transacted="false" >
            <rabbit:listener queues="queue_delay_done" ref="messageConsumer" />
        </rabbit:listener-container>
    
    </beans>
    复制代码

     DelayMessageProducer.java

    复制代码
    @Service
    public class DelayMessageProducer {
    
        @Resource(name="delayMsgTwoTemplate")
        private AmqpTemplate delayMsgTwoTemplate;
    
        public void delayMsgTwo(String exchange, String routingKey, Object msg) {
            delayMsgTwoTemplate.convertAndSend(exchange, routingKey, msg, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setExpiration(String.valueOf(10000));
                    return message;
                }
            });
        }
    }
    复制代码

     MessageConsumer.java

    复制代码
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    
    public class MessageConsumer implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            System.out.println("consumer receive message 22------->:{}"+ message);
    
        }
    }
    复制代码

    application.xml 

    复制代码
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
    
        <import resource="spring-rabbitmq.xml" />
    
        <!-- 扫描指定package注释的注册为Spring Beans -->
        <context:component-scan base-package="com.nancy.rabbitmq" />
    
        <!-- 激活annotation功能 -->
        <context:annotation-config />
        <!-- 激活annotation功能 -->
        <context:spring-configured />
    
    </beans>
    复制代码

    pom.xml

    复制代码
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>learning-demo</artifactId>
            <groupId>com.nancy</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>demo-jms</artifactId>
        <packaging>jar</packaging>
    
        <name>demo-jms</name>
        <url>http://maven.apache.org</url>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <spring.version>5.0.0.RELEASE</spring.version>
        </properties>
    
        <dependencies>
    
            <dependency>
                <groupId>org.aspectj</groupId>
                <artifactId>aspectjweaver</artifactId>
                <version>1.8.6</version>
            </dependency>
            <!--依赖包 -->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <!--rabbit MQ -->
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-amqp</artifactId>
                <version>1.6.6.RELEASE</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-core</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>1.6.6.RELEASE</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-web</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-core</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-context</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-tx</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-messaging</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <!--spring -->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-core</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-aspects</artifactId>
                <version>${spring.version}</version>
                <exclusions>
                    <exclusion>
                            <groupId>org.aspectj</groupId>
                            <artifactId>aspectjweaver</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-webmvc</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context-support</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-test</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <!--junit -->
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>3.8.1</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
            </dependency>
        </dependencies>
    
    </project>
    复制代码

     DelayQueueTest.java

    复制代码
    import com.alibaba.fastjson.JSONObject;
    import com.nancy.rabbitmq.delay.DelayMessageProducer;
    import org.junit.Test;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class DelayQueueTest {
    
        private ApplicationContext context = null;
    
        //    @Ignore
        @org.junit.Before
        public void setUp() throws Exception {
            context = new ClassPathXmlApplicationContext("rabbitmq/application.xml");
        }
    
        //    @Ignore
        @Test
        public void delayQueueTest() throws Exception {
            DelayMessageProducer messageProducer = context.getBean(DelayMessageProducer.class);
            int a = 10;
            while (a > 0) {
                System.out.println("send "+ a);
                messageProducer.delayMsgTwo("exchange_delay_begin","delay", "hello world delay2 :" + a--);
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            System.out.println("sended ");
            Thread.sleep(1000*60);
        }
        
    }
    复制代码

     运行结果: 发送消息 10s之后, 消费监听到消息 消费。

    复制代码
    send 10
    send 9
    send 8
    send 7
    send 6
    send 5
    send 4
    send 3
    send 2
    send 1
    sended 
    consumer receive message 22------->:{}(Body:'hello world delay2 :10' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
    consumer receive message 22------->:{}(Body:'hello world delay2 :9' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
    consumer receive message 22------->:{}(Body:'hello world delay2 :8' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=3, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
    consumer receive message 22------->:{}(Body:'hello world delay2 :7' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=4, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
    consumer receive message 22------->:{}(Body:'hello world delay2 :6' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=5, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
    consumer receive message 22------->:{}(Body:'hello world delay2 :5' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=6, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
    consumer receive message 22------->:{}(Body:'hello world delay2 :4' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=7, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
    consumer receive message 22------->:{}(Body:'hello world delay2 :3' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=8, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
    consumer receive message 22------->:{}(Body:'hello world delay2 :2' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=9, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
    consumer receive message 22------->:{}(Body:'hello world delay2 :1' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=10, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
    复制代码

    参考资料:

    DLX: https://www.rabbitmq.com/dlx.html

    RabbitMQ如何实现延迟队列?:https://blog.csdn.net/u013256816/article/details/55106401

    消息队列之 RabbitMQ :https://www.jianshu.com/p/79ca08116d57

    使用RabbitMQ实现延迟任务 : https://www.cnblogs.com/haoxinyue/p/6613706.html

     

    转自:https://www.cnblogs.com/xiaoxing/p/9250823.html

    寻找撬动地球的支点(解决问题的方案),杠杆(Java等编程语言)已经有了。xkzhangsan
  • 相关阅读:
    Linux常用基本命令:三剑客命令之-awk数组用法
    Linux常用基本命令:三剑客命令之-awk动作用法(1)
    Linux常用基本命令:三剑客命令之-awk模式用法(2)
    Linux常用基本命令:三剑客命令之-awk模式用法(1)
    Linux常用基本命令:三剑客命令之-awk格式化动作
    Linux常用基本命令:三剑客命令之-awk内置变量与自定义变量
    Linux常用基本命令:三剑客命令之-awk输入输出分隔符
    Linux常用基本命令:三剑客命令之-awk基础用法
    Linux环境变量详解与应用
    在js中怎么判断两个字符串相等
  • 原文地址:https://www.cnblogs.com/xkzhangsanx/p/11025944.html
Copyright © 2011-2022 走看看