zoukankan      html  css  js  c++  java
  • springboot+rabbitmq 之 消费端配置

    Chapter1

    直接上代码:

    @Slf4j
    @Component
    public class UserSettlementConsumer {
    
        @RabbitHandler
        @RabbitListener(queues = "${spring.rabbitmq.mq-name}")
        public void testListener(String msg) {
            log.info("消息出队:"+msg);
        }
    }

     可以看出来,RabbitMQ主要是借助于@RabbitHandler和@RabbitListener这两个注解来实现消息队列的消费。

    @RabbitHandler的javadoc注释:Annotation that marks a method to be the target of a Rabbit message listener within a class that is annotated with RabbitListener.
    @RabbitListener的javadoc注释:Annotation that marks a method to be the target of a Rabbit message listener on the specified queues() (or bindings()).

    Chapter2

    需要说明的是,在应用程序服务启动时,如果指定的消息在broker中不存在,则会导致mq初始化失败,服务也无法启动。错误信息:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'test.queue' in vhost '/', class-id=50, method-id=10)

    如下是详细的错误信息:

    2021-06-15 15:13:14.997 [][] [main] INFO  o.s.a.rabbit.connection.CachingConnectionFactory:497 - Created new connection: connectionFactory#53ea380b:0/SimpleConnection@5b1cedfd [delegate=amqp://rabbitadmin@192.168.40.20:5672/, localPort= 1142]
    2021-06-15 15:13:15.155 [][] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#3-1] WARN  o.s.amqp.rabbit.listener.BlockingQueueConsumer:707 - Failed to declare queue: test.queue
    2021-06-15 15:13:15.155 [][] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#3-1] WARN  o.s.amqp.rabbit.listener.BlockingQueueConsumer:641 - Queue declaration failed; retries left=3
    org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[test.queue]
        at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:713)
        at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:597)
        at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:584)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1338)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1183)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.io.IOException: null
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126)
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144)
        at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1006)
        at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:52)
        at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.queueDeclarePassive(PublisherCallbackChannelImpl.java:363)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1190)
        at com.sun.proxy.$Proxy252.queueDeclarePassive(Unknown Source)
        at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:692)
        ... 5 common frames omitted
    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'test.queue' in vhost '/', class-id=50, method-id=10)
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494)
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138)
        ... 15 common frames omitted

    错误描述很明显,是要求指定的队列必须存在。

    解决办法有2个:

    • 一是利用@Bean注解来声明queue。
    • 二是借助于@RabbitListener的成员:queuesToDeclare

    @Bean声明方式如下:

    @Configuration
    public class DirectRabbitConfig {
    
        @Value("${spring.rabbitmq.mq-name}")
        private String mqName;
    
        @Bean
        public Queue myQueue() {
            return new Queue(mqName, true);
        }
    }

    org.springframework.amqp.rabbit.annotation.RabbitListener#queuesToDeclare 可以在队列不存在时直接创建:

    @Slf4j
    @Component
    public class UserSettlementConsumer {
    
        @RabbitHandler
        @RabbitListener(queuesToDeclare ={@Queue(name = "${spring.rabbitmq.mq-name}",durable = "true")})
        public void testListener(String msg) {
            log.info("消息出队:"+msg);
        }
    }

     如下是spring-rabbit-2.2.0.RELEASE.jar里org.springframework.amqp.rabbit.annotation.RabbitListener#queuesToDeclare的javadoc:

    package org.springframework.amqp.rabbit.annotation;
    public @interface RabbitListener {
    
        /**
         * The queues for this listener.
         * If there is a {@link org.springframework.amqp.rabbit.core.RabbitAdmin} in the
         * application context, the queue will be declared on the broker with default
         * binding (default exchange with the queue name as the routing key).
         * Mutually exclusive with {@link #bindings()} and {@link #queues()}.
         * @return the queue(s) to declare.
         * @see org.springframework.amqp.rabbit.listener.MessageListenerContainer
         * @since 2.0
         */
        Queue[] queuesToDeclare() default {};
    
    }

    Chapter3

    回过头来介绍RabbitListener#queues()

    package org.springframework.amqp.rabbit.annotation;
    public @interface RabbitListener {
    
        /**
         * The queues for this listener.
         * The entries can be 'queue name', 'property-placeholder keys' or 'expressions'.
         * Expression must be resolved to the queue name or {@code Queue} object.
         * The queue(s) must exist, or be otherwise defined elsewhere as a bean(s) with
         * a {@link org.springframework.amqp.rabbit.core.RabbitAdmin} in the application
         * context.
         * Mutually exclusive with {@link #bindings()} and {@link #queuesToDeclare()}.
         * @return the queue names or expressions (SpEL) to listen to from target
         * @see org.springframework.amqp.rabbit.listener.MessageListenerContainer
         */
        String[] queues() default {};
    
    }

    从以上RabbitListener#queues()的javadoc内容可以看出来如下三点信息,其中第2条指明了队列必须存在:

    1. queues的取值可以是常量(如 MessageQueueConstant.USER_QUEUE),可以是属性占位符(如 "${spring.rabbitmq.mq-name}"),可以是SpEL表达式(如 "#{configToolkitProp['zk.address']}"、"#{userQueue.name}")
    2. 所指定的队列必须存在,或者是ApplicationContext里的一个具有org.springframework.amqp.rabbit.core.RabbitAdmin的bean。
    3. queues()与bindings()和queuesToDeclare()是互斥的。设定了queues(),就不能再设定bindings()和queuesToDeclare()了。

    Chapter4

    在服务运行过程中,如果broker里的交换机或队列被删掉了,同样会导致无法生产消息。如下是错误信息。这时,只能重启服务或者手动在broker里创建所需的交换机和队列。
    2021-06-17 15:13:18.782 [AMQP Connection 192.168.40.20:5672] ERROR o.s.a.rabbit.connection.CachingConnectionFactory:1468 - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'test-USER_SETTLEMENT_EXCHANGE' in vhost '/', class-id=60, method-id=40)

  • 相关阅读:
    Nginx反向代理和jetty服务器配置
    如何使用canvas绘图
    毕业后,你折腾了多久做了多少努力才找到正确的方向或者道路?
    如何提高用户逃离成本
    首次创业者必须知道哪些基本常识?
    拦截器、过滤器、监听器各有什么作用
    第一人称入行分享贴:大学混了四年,如何顺利入行互联网
    线下学习
    如何实现数组深拷贝和浅拷贝?
    从零学前端第二讲:CSS行内块级元素布局与定位
  • 原文地址:https://www.cnblogs.com/buguge/p/14887177.html
Copyright © 2011-2022 走看看