zoukankan      html  css  js  c++  java
  • RabbitMQ与SpringBoot整合_消费端

    配置文件

    srcmain esourcesapplication.properties

    server.servlet.context-path=/
    server.port=8002
    
    spring.rabbitmq.addresses=192.168.11.71:5672,192.168.11.72:5672,192.168.11.71:5673
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    ##     表示消费者消费成功消息以后需要手工的进行签收(ack),默认为auto
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    spring.rabbitmq.listener.simple.concurrency=5
    spring.rabbitmq.listener.simple.max-concurrency=10
    spring.rabbitmq.listener.simple.prefetch=1
    
    
    ##    作业:
    ##    最好不要在代码里写死配置信息,尽量使用这种方式也就是配置文件的方式
    ##    在代码里使用     ${}    方式进行设置配置: ${spring.rabbitmq.listener.order.exchange.name}
    spring.rabbitmq.listener.order.exchange.name=order-exchange
    spring.rabbitmq.listener.order.exchange.durable=true
    spring.rabbitmq.listener.order.exchange.type=topic
    spring.rabbitmq.listener.order.exchange.key=order.*
    View Code

    消费端代码

    package com.bfxy.rabbit.consumer.component;
    
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;
    
    import com.rabbitmq.client.Channel;
    
    @Component
    public class RabbitReceive {
        
        /**
         *     组合使用监听
         *     @RabbitListener @QueueBinding @Queue @Exchange
         * @param message
         * @param channel
         * @throws Exception
         */
        @RabbitListener(bindings = @QueueBinding(
                        value = @Queue(value = "queue-1", durable = "true"),
                        exchange = @Exchange(name = "exchange-1",
                        durable = "true",
                        type = "topic",
                        ignoreDeclarationExceptions = "true"),
                        key = "springboot.*"
                    )
                )
        @RabbitHandler
        public void onMessage(Message message, Channel channel) throws Exception {
            //    1. 收到消息以后进行业务端消费处理
            System.err.println("-----------------------");
            System.err.println("消费消息:" + message.getPayload());
    
            //  2. 处理成功之后 获取deliveryTag 并进行手工的ACK操作, 因为我们配置文件里配置的是 手工签收
            //    spring.rabbitmq.listener.simple.acknowledge-mode=manual
            Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            channel.basicAck(deliveryTag, false);
        }
        
        
        
    }
    View Code
  • 相关阅读:
    Features for Multi-Target Multi-Camera Tracking and Re-identification论文解读
    CBAM(Convolutional Block Attention Module)使用指南
    j2ee web项目 ssh 中使用junit测试
    log4j 发送日志到邮箱
    java.util.ConcurrentModificationException
    java 项目 报错
    json 传参数到action中 乱码
    TOMCAT 信息
    action 纯注解 笔记
    java 上传图片 打水印
  • 原文地址:https://www.cnblogs.com/callbin/p/14551321.html
Copyright © 2011-2022 走看看