zoukankan      html  css  js  c++  java
  • rabbitmq延迟队列demo

     1. demo详解

    1.1 工程结构:

    1.2 pom

    定义jar包依赖的版本。版本很重要,rabbit依赖spring,两者必须相一致,否则报错:

    <properties>
        <springframework.version>4.2.7.RELEASE</springframework.version>
        <spring-rabbit.version>1.6.1.RELEASE</spring-rabbit.version>
    </properties>

    dependencies:

    <dependencies>
    
        <!-- LOGGING begin -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.0.13</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.0.13</version>
        </dependency>
        <!-- 代码直接调用common-logging会被桥接到slf4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>1.7.5</version>
        </dependency>
        <!-- LOGGING end -->
    
        <!--springframework-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${springframework.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${springframework.version}</version>
        </dependency>
    
        <!-- rabbitmq spring依赖 -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>${spring-rabbit.version}</version>
        </dependency>
    
    </dependencies>

    1.3 spring配置

    spring-applicationContext:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
           http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
           http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd 
             http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    
        <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
            <property name="fileEncoding" value="UTF-8"></property>
            <property name="locations">
                <list>
                    <value>classpath:applicationContext.properties</value>
                </list>
            </property>
        </bean>
    
        <context:annotation-config/>
    
        <bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
        <!-- 配置扫描路径 -->
        <context:component-scan base-package="demo"></context:component-scan>
    
        <!--rabbit server参数 -->
        <rabbit:connection-factory id="connectionFactory"
                                   username="${paycenter.mq.user.username}"
                                   password="${paycenter.mq.user.password}"
                                   addresses="${paycenter.mq.user.host}"></rabbit:connection-factory>
    
        <import resource="classpath:mq-applicationContext-producer.xml"/>
        <import resource="classpath:mq-applicationContext-consumer.xml"/>
    </beans>

    mq-applicationContext-producer.xml: 

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
         http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
         http://www.springframework.org/schema/rabbit
         http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">
    
        <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
        <rabbit:admin connection-factory="connectionFactory"/>
    
        <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列 -->
        <bean id="mqMessageConverter"
              class="org.springframework.amqp.support.converter.SimpleMessageConverter">
        </bean>
    
        <!--<bean id="publisherConfirmsReturns" class="com.emaxcard.mq.rabbit.PublisherConfirmsReturns"></bean>-->
    
    
        <!--========================延迟队列配置 begin =========================-->
        <rabbit:queue id="agentpayqueryQueue2" durable="true" auto-delete="true" exclusive="false"
                      name="agentpayqueryQueue2"/>
        <rabbit:direct-exchange id="agentpayqueryExchange2" durable="true" auto-delete="true" name="agentpayqueryExchange2">
            <rabbit:bindings>
                <rabbit:binding queue="agentpayqueryQueue2" key="delay"/>
            </rabbit:bindings>
        </rabbit:direct-exchange>
    
    
        <rabbit:queue id="agentpayqueryQueue1" durable="true" auto-delete="true" exclusive="false"
                      name="agentpayqueryQueue1">
            <rabbit:queue-arguments>
                <entry key="x-dead-letter-exchange" value="agentpayqueryExchange2"/>
                <entry key="x-message-ttl" value="10000" value-type="java.lang.Long"/>
            </rabbit:queue-arguments>
        </rabbit:queue>
        <rabbit:direct-exchange id="agentpayqueryExchange1" durable="true" auto-delete="true" name="agentpayqueryExchange1">
            <rabbit:bindings>
                <rabbit:binding queue="agentpayqueryQueue1" key="delay"/>
            </rabbit:bindings>
        </rabbit:direct-exchange>
    
        <!--定义RabbitTemplate实例-->
        <!--confirm-callback="publisherConfirmsReturns" return-callback="publisherConfirmsReturns"-->
        <rabbit:template id="agentpayQueryMsgTemplate"
                         exchange="agentpayqueryExchange1" routing-key="delay"
                         connection-factory="connectionFactory" message-converter="mqMessageConverter"
                         mandatory="true"
        />
        <!--========================延迟队列配置 end =========================-->
    
    </beans>

    mq-applicationContext-consumer.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
         http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
         http://www.springframework.org/schema/rabbit
         http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">
    
    
        <bean id="agentpayQueryConsumer" class="demo.TestMQConsumer" />
    
        <!-- TODO 后续删除
        receive-timeout:等待接收超时时长 影响连接创建和销毁
    
        concurrency:消费者个数
        max-concurrency:最大消费者个数
        min-start-interval:陆续启动  减少并发环境(或是三方系统突然的网络延迟) 大量连接导致的性能耗损
        min-stop-interval:陆续销毁   减少突然的安静 导致大量可用连接被销毁
        min-consecutive-active: 连续N次没有接收发生超时  则认定为需要创建 消费者
        min-consecutive-idle: 连续N次发生了接收超时   则认定消费者需要销毁
    
        prefetch:每个消费者预读条数 因为异步调用三方 性能瓶颈在网络与三方系统所以预读取条数设置为1(默认为5) 只有一条消息被ACK才会接收下一条消息
        transaction-size:会影响prefetch的数量
        -->
        <!--  监听器 -->
        <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
        <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"
                                   max-concurrency="20"
                                   concurrency="5"
                                   prefetch="10">
            <rabbit:listener ref="agentpayQueryConsumer" queue-names="agentpayqueryQueue2" />
        </rabbit:listener-container>
    </beans>

     1.4 class

    Producer类:
    package demo;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath:applicationContext.xml")
    public class TestMQProducer {
    
        private static Logger logger = LoggerFactory.getLogger(TestMQProducer.class.getSimpleName());
    
        @Autowired
        private RabbitTemplate agentpayQueryMsgTemplate;
    
        @Test
        public void test() throws Exception {
            for (int i = 0; i <= 100; i++) {
                Object data = String.valueOf(i);
                agentpayQueryMsgTemplate.convertAndSend(data);
                logger.info("入队:{}", data);
            }
            Thread.sleep(12000);
        }
    }
    Consumer类:
    package demo;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    
    public class TestMQConsumer implements MessageListener {
    
        private static Logger logger = LoggerFactory.getLogger(TestMQConsumer.class.getSimpleName());
    
        public void onMessage(Message message) {
            String data = new String(message.getBody());
    
            try {
                //模拟处理慢
                Thread.sleep(1);
    
                logger.info("出队:{}", data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    
    }

     1.5 rabbitmq控制台页面截图

    我这里rabbit版本是3.7.7

    2. 说明

    2.1 auto-delete

    上面定义队列时我把auto-delete属性设置为true, 所以,当消费者消费完并关闭连接后,队列会自动删除。exchange也如是。(通过mq控制台看,栗子中的agentpayqueryQueue2和agentpayqueryExchange2在执行完就自动消失了,agentpayqueryQueue1和agentpayqueryExchange1还存在。)

    spring-rabbit-x.xml里对queue和exchange的auto-delete属性的解释:

    Flag indicating that an queue will be deleted when it is no longer in use, i.e. the connection that declared it is closed. Default is false.(rabbit:queue)
    
    
    Flag indicating that an exchange will be deleted when no longer in use, i.e. the connection that declared it is closed. Default is false.(rabbit:exchange)

    2.2 消费端的concurrency

    同样,看spring-rabbit-x.xml的解释:

    The number of concurrent consumers to start for each listener initially.
    See also 'max-concurrency'.

    上面我设置的值是5,从mq控制台里看queue的consumer见下图(消息被全部消费之后,channel关闭,连接断开,consumer自动清空):

    从出队日志,可以看出来,共有5个线程在消费这些消息。

    2.3 consumer配置补充说明

    注意上面rabbitmq的consumer配置文件。因为是测试demo,所以没有定义queue。而在实际的企业应用开发中,是需要先定义queue的,否则找不到queue,启动容器会报错的,见下面stacktrace。因此必须修改consumer配置文件。

    java.lang.IllegalStateException: Failed to load ApplicationContext
    ...
    Caused by: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0'; nested exception is org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup
    ...
    ... 24 more
    Caused by: org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup
    ...
    ... 36 more
    Caused by: org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
    ...
    Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[agentpayqueryQueue2]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:587)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:486)
    ... 2 more
    Caused by: java.io.IOException
    ...
    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'agentpayqueryQueue2' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 11 more
    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'agentpayqueryQueue2' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:560)
    ... 1 more

     事先定义了queue的consumer配置文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
         http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
         http://www.springframework.org/schema/rabbit
         http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">
    
    
        <!--通过指定下面的admin信息,当前配置中定义的queue会在rabbitmq服务器上自动生成 -->
        <rabbit:admin connection-factory="connectionFactory"/>
    
        <rabbit:queue id="agentpayqueryQueue2" name="agentpayqueryQueue2" durable="true" auto-delete="true"></rabbit:queue>
    
        <bean id="agentpayQueryConsumer" class="demo.TestMQConsumer" />
    
        <!--  监听器 -->
        <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
        <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"
                                   max-concurrency="20"
                                   concurrency="5"
                                   prefetch="10">
            <rabbit:listener ref="agentpayQueryConsumer" queues="agentpayqueryQueue2" />
        </rabbit:listener-container>
    </beans>

    2.4 队列的创建与修改

    1)上面定义的exchange和queue是在服务启动的时候创建的。

    2)我在测试时,上面缓冲队列agentpayqueryQueue1,起初没有配置<entry key="x-message-ttl" value="10000" value-type="java.lang.Long"/>,后来我要加上这个配置,而重启Producer服务后发现并没有生成这个x-message-ttl属性。也就是说,一旦队列存在,那么,将无法对其做改动。这种情况下,需要事先删掉agentpayqueryQueue1然后再重启Producer服务来重新创建agentpayqueryQueue1。**因为删掉队列会同时删掉队列里的消息(如果有),所以此功能要慎用,必要情况下需要中止生产端的消息入队操作,待队列重建完成后再恢复使用。 **当然,通常情况下,队列一旦创建,我们也不会对其做什么改动了。

     3. 结束

  • 相关阅读:
    Castle IOC容器内幕故事(下)
    Castle IOC容器实践之TypedFactory Facility(一)
    Castle ActiveRecord学习实践(9):使用ActiveRecord的一些技巧
    Web2.0改变了我的生活
    Castle IOC容器与Spring.NET配置之比较
    Castle IOC容器实践之Startable Facility(二)
    Castle IOC容器构建配置详解(二)
    Castle IOC容器实践之TypedFactory Facility(二)
    Castle IOC容器快速入门
    Castle IOC容器组件生命周期管理
  • 原文地址:https://www.cnblogs.com/buguge/p/10110932.html
Copyright © 2011-2022 走看看