zoukankan      html  css  js  c++  java
  • RabbitMQ(五)Springboot集成RabbitMQ

     

    基本概念

    • @EnableRabbit

    @EnableRabbit和@Configuration一起使用,可以加在类或者方法上,这个注解开启了容器对注册的bean的@RabbitListener检查。

    • @RabbitListener

    @RabbitListener用于注册Listener时使用的信息:如queue,exchange,key、ListenerContainerFactory和RabbitAdmin的bean name。

    扫描到bean带有该注解后,首先会将注解的内容封装到Endpoint对象中并和ListenerContainerFactory的实例一起添加到上面的RabbitListenerEndpointRegistry实例中。添加的时候会创建相应的ListenerContainer实例并添加Listener对象。
    • @RabbitHandler

    @RabbitListener 和 @RabbitHandler结合使用,不同类型的消息使用不同的方法来处理。

     

    依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    配置

    • application.yml配置文件:
    spring:
      rabbitmq:
        host: 192.168.58.129
        port: 5672
        username: orcas
        password: 1224
        virtual-host: /
        connection-timeout: 15000
        publisher-confirms: true
        publisher-returns: true
        template:
          mandatory: true
        listener:
          simple:
            acknowledge-mode: manual # 手动ack
            concurrency: 5 # 监听消息的个数
            max-concurrency: 10 
          # 自定义mq配置 用于声明交换机、队列、绑定路由的参数
          order:
            queue:
              name: queue-2
              durable: true
            exchange:
              name: exchange-2
              durable: true
              type: topic
              ignoreDeclarationExceptions: true
            key: springboot.*
    • publisher-confirms,实现一个监听器用于监听Broker端返回的确认请求。
    • publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达情况,则使用监听器对不可达消息进行后续处理,保证消息的路由成功。
    • template.mandatory,true则监听器会接收到路由不可达的消息,然后进行处理;false则Broker会自动删除该消息。默认是false。

    注:也可以参考与Spring整合中的配置文件,就是以@Bean的方式声明交换机、队列与绑定关系。

    生产者

    @Component
    public class RabbitSender {
    
        //自动注入RabbitTemplate模板类
        @Autowired
        private RabbitTemplate rabbitTemplate;  
        
        //回调函数: confirm确认
        final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.err.println("correlationData: " + correlationData);
                System.err.println("ack: " + ack);
                if(!ack){
                    System.err.println("异常处理....");
                }
            }
        };
        
        //回调函数: return返回
        final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText,
                    String exchange, String routingKey) {
                System.err.println("return exchange: " + exchange + ", routingKey: " 
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
            }
        };
        
        //发送消息方法调用: 构建Message消息
        public void send(Object message, Map<String, Object> properties) throws Exception {
            MessageHeaders mhs = new MessageHeaders(properties);
            Message msg = MessageBuilder.createMessage(message, mhs);
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnCallback(returnCallback);
            // 设置消息的唯一id
            CorrelationData correlationData = new CorrelationData("1234567890"); //id + 时间戳 全局唯一 
            rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
        }
        
        //发送消息方法调用: 构建自定义对象消息
        public void sendOrder(Order order) throws Exception {
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnCallback(returnCallback);
            //id + 时间戳 全局唯一 
            CorrelationData correlationData = new CorrelationData("0987654321");
            rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
        }
    }

    消费者

    @Exchange@Queue@QueueBinding 组合注解用来声明交换机、队列和绑定路由。

    @Component
    public class RabbitReceiver {
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", 
                durable="${spring.rabbitmq.listener.order.queue.durable}"),
                exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", 
                durable="${spring.rabbitmq.listener.order.exchange.durable}", 
                type= "${spring.rabbitmq.listener.order.exchange.type}", 
                ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
                key = "${spring.rabbitmq.listener.order.routingKey}"
                )
        )
    @RabbitHandler
    public void onOrderMessage(@Payload Order order, Channel channel, @Headers Map<String, Object> headers) throws Exception { System.err.println("--------------------------------------"); System.err.println("消费端order: " + order.getId()); Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); //手工ACK channel.basicAck(deliveryTag, false); } }

    引用:

     https://www.javatt.com/p/11158

  • 相关阅读:
    Access restriction on class due to restriction on required library rt.jar?
    Why “no projects found to import”?
    MySQL
    您对无法重新创建的表进行了更改或者启用了“阻止保存要求重新创建表的更改”选项
    INTJINTJ——内向+直觉+思维+判
    豆瓣网案例分析报告
    如何使用Git
    如何在不到六个月的时间内成为一个开发者
    关于网站编程Alex
    string
  • 原文地址:https://www.cnblogs.com/caoweixiong/p/12911192.html
Copyright © 2011-2022 走看看