zoukankan      html  css  js  c++  java
  • RabbitMQ入门实战(3)--SpringBoot整合RabbitMQ

    Spring AMQP是基于Spring框架的AMQ 消息解决方案,提供模板化发送和接收消息的抽象层,提供基于消息驱动的POJO。本文主要介绍在SpringBoot中用Spring AMQP操作RabbitMQ,文中使用到的软件版本:RabbitMQ 3.8.9、SpringBoot 2.2.5.RELEASE、Java 1.8.0_191。

    1、Spring AMQP简介

    1.1、Spring AMQP主要对象类及作用

    作用
    Queue 对应RabbitMQ中Queue
    AmqpTemplate 接口,用于向RabbitMQ发送和接收Message
    RabbitTemplate AmqpTemplate的实现类
    @RabbitListener 指定消息接收方,可以配置在类和方法上
    @RabbitHandler 指定消息接收方,只能配置在方法上,可以与@RabbitListener一起使用
    Message 对RabbitMQ消息的封装
    Exchange 对RabbitMQ的Exchange的封装,子类有TopicExchange、FanoutExchange和DirectExchange等
    Binding 将一个Queue绑定到某个Exchange,本身只是一个声明,并不做实际绑定操作
    AmqpAdmin 接口,用于Exchange和Queue的管理,比如创建/删除/绑定等,自动检查Binding类并完成绑定操作
    RabbitAdmin AmqpAdmin的实现类
    ConnectionFactory

    创建Connection的工厂类,RabbitMQ也有一个名为ConnectionFactory的类但二者没有继承关系,

    Spring ConnectionFactory可以认为是对RabbitMQ ConnectionFactory的封装

    CachingConnectionFactory Spring ConnectionFactory的实现类,可以用于缓存Channel和Connection
    Connection

    Spring中用于创建Channel的连接类,RabbitMQ也有一个名为Connection的类,

    但二者没有继承关系,Spring Connection是对RabbitMQ Connection的封装

    SimpleConnection Spring Connection的实现类,将实际工作代理给RabbitMQ的Connection类
    MessageListenerContainer 接口,消费端负责与RabbitMQ服务器保持连接并将Message传递给实际的@RabbitListener/@RabbitHandler处理
    RabbitListenerContainerFactory 接口,用于创建MessageListenerContainer
    SimpleMessageListenerContainer MessageListenerContainer的实现类
    SimpleRabbitListenerContainerFactory RabbitListenerContainerFactory的实现类
    RabbitProperties 用于配置Spring AMQP的Property类

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

    1.2、Spring AMQP主要参数

    参数 默认值 说明
    基础信息    
    spring.rabbitmq.host localhost 主机
    spring.rabbitmq.port 5672 端口
    spring.rabbitmq.username guest 用户名
    spring.rabbitmq.password guest 密码
    spring.rabbitmq.virtual-host   虚拟主机
    spring.rabbitmq.addresses   server的地址列表(以逗号分隔),配置了该项将忽略spring.rabbitmq.host和spring.rabbitmq.port
    spring.rabbitmq.requested-heartbeat   请求心跳超时时间,0表示不指定;如果后面没加时间单位默认为秒
    spring.rabbitmq.publisher-confirm-type none

    发布确认类型,none、correlated、simple

    该配置只管有无投递到exchange,而不管有无发送到队列当中

    spring.rabbitmq.publisher-returns false 是否启用发布返回
    spring.rabbitmq.connection-timeout   连接超时时间,0表示永不超时
    缓存cache    
    spring.rabbitmq.cache.channel.checkout-timeout   如果已达到channel缓存大小,等待获取channel的时间。 如果为0,则始终创建一个新channel。
    spring.rabbitmq.cache.channel.size   缓存中保持的channel数量
    spring.rabbitmq.cache.connection.size   缓存的connection数,只有是CONNECTION模式时生效
    spring.rabbitmq.cache.connection.mode channel 连接工厂缓存模式
    Listener    
    spring.rabbitmq.listener.type simple 容器类型,simple或direct
    spring.rabbitmq.listener.simple.auto-startup true 应用启动时是否启动容器
    spring.rabbitmq.listener.simple.acknowledge-mode auto 消息确认方式,none、manual和auto
    spring.rabbitmq.listener.simple.concurrency   listener最小消费者数
    spring.rabbitmq.listener.simple.max-concurrency   listener最大消费者数
    spring.rabbitmq.listener.simple.prefetch   一个消费者最多可处理的nack消息数量
    spring.rabbitmq.listener.simple.default-requeue-rejected true 被拒绝的消息是否重新入队
    spring.rabbitmq.listener.simple.missing-queues-fatal true 如果容器声明的队列不可用,是否失败;或如果在运行时删除一个或多个队列,是否停止容器
    spring.rabbitmq.listener.simple.idle-event-interval   空闲容器事件应多久发布一次
    spring.rabbitmq.listener.simple.retry.enabled false 是否开启消费者重试
    spring.rabbitmq.listener.simple.retry.max-attempts 3 最大重试次数
    spring.rabbitmq.listener.simple.retry.max-interval 10000ms 最大重试间隔
    spring.rabbitmq.listener.simple.retry.initial-interval 1000ms 第一次和第二次尝试发送消息的时间间隔
    spring.rabbitmq.listener.simple.retry.multiplier 1.0 应用于前一个重试间隔的乘数
    spring.rabbitmq.listener.simple.retry.stateless true 重试是无状态还是有状态
         
    spring.rabbitmq.listener.direct.consumers-per-queue   每个队列消费者数量
    direct类型listener其他参数同simple类型    
    Template    
    spring.rabbitmq.template.mandatory false

    消息在没有被队列接收时是否退回,与spring.rabbitmq.publisher-returns类似,
    该配置优先级高于spring.rabbitmq.publisher-returns

    spring.rabbitmq.template.receive-timeout   receive() 操作的超时时间
    spring.rabbitmq.template.reply-timeout   sendAndReceive() 操作的超时时间
    spring.rabbitmq.template.retry.enabled false 发送消息是否重试
    spring.rabbitmq.template.retry.max-attempts 3.0 发送消息最大重试次数
    spring.rabbitmq.template.retry.initial-interval 1000ms 第一次和第二次尝试发送消息的时间间隔
    spring.rabbitmq.template.retry.multiplier 1.0 应用于前一个重试间隔的乘数
    spring.rabbitmq.template.retry.max-interval 10000ms 最大重试间隔

    2、SpringBoot整合RabbitMQ

    2.1、引入依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    2.2、增加rabbit配置

    spring:
      rabbitmq:
        host: 10.49.196.10
        port: 5672
        virtual-host: /
        username: guest
        password: guest
        publisher-confirm-type: simple #消息发送确认模式 none、correlated、simple;Confirm模式只管有无投递到exchange,而不管有无发送到队列当中
        publisher-returns: true #当消息未投递到queue时消息退是否回
        template:
          mandatory: true #消息在没有被队列接收时是否退回,与spring.rabbitmq.publisher-returns类似,该配置优先级高于spring.rabbitmq.publisher-returns
        listener:
          type: simple #容器类型
          simple:
            acknowledge-mode: manual #消息消费确认模式

    2.3、配置类

    package com.abc.demo.general.rabbit.springboot;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitMQConfig {
        public static String EXCHANGE_NAME = "exchange.log";
        public static final String QUEUE_NAME = "queue.file";
    
        /**
         * 声明direct类型交换机
         * @return
         */
        @Bean("exchangeLog")
        public Exchange exchange() {
            return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();
        }
    
        /**
         * 声明队列
         * @return
         */
        @Bean("queueFile")
        public Queue queue(){
            return QueueBuilder.durable(QUEUE_NAME).build();
        }
    
        /**
         * 绑定队列和交换机
         * @param queue
         * @param exchange
         * @return
         */
        @Bean
        public Binding bindingWarn(@Qualifier("queueFile") Queue queue, @Qualifier("exchangeLog") Exchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with("warn").noargs();
        }
    
        @Bean
        public Binding bindingInfo(@Qualifier("queueFile") Queue queue, @Qualifier("exchangeLog") Exchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with("info").noargs();
        }
    }

    2.4、生产者

    package com.abc.demo.general.rabbit.springboot;
    
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    @Component
    public class Producer implements InitializingBean {
        private static Logger logger = LoggerFactory.getLogger(Producer.class);
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        private static int index = 0;
    
        @Scheduled(cron = "0/5 * * * * *")
        public void sendMsg() {
            String msg = "消息-" + index++;
            logger.info("发送消息:{}", msg);
            String routingKey = "";
            if (index % 3 == 0) {
                routingKey = "warn";
            } else if(index % 3 == 1) {
                routingKey = "info";
            }  else {
                routingKey = "debug";
            }
    
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, routingKey, msg);
        }
    
        /**
         * 发送消息确认
         * spring.rabbitmq.publisher-confirm-type=correlated
         */
        //@Scheduled(cron = "0/5 * * * * *")
        public void sendMsgConfirmCorrelated() {
            index++;
            String msg = "消息-" + index;
            logger.info("发送消息:{}", msg);
            String routingKey = "";
            if (index % 3 == 0) {
                routingKey = "warn";
            } else if(index % 3 == 1) {
                routingKey = "info";
            }  else {
                routingKey = "debug";
            }
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    logger.info("消息:{},发送成功。", correlationData);
                } else {
                    logger.info("消息:{},发送失败。原因:{}", correlationData, cause);
                }
            });
    
            CorrelationData correlationData = new CorrelationData();
            correlationData.setId(index + "");
            //用不存在的exchange测试
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME + "2", routingKey, msg, correlationData);
        }
    
        /**
         * 发送消息确认
         * spring.rabbitmq.publisher-confirm-type=simple
         */
        //@Scheduled(cron = "0/5 * * * * *")
        public void sendMsgConfirmSimple() {
            index++;
            String msg = "消息-" + index;
            logger.info("发送消息:{}", msg);
            String routingKey = "";
            if (index % 3 == 0) {
                routingKey = "warn";
            } else if(index % 3 == 1) {
                routingKey = "info";
            }  else {
                routingKey = "debug";
            }
    
            final String rk = routingKey;
            rabbitTemplate.invoke(operations -> {
                rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, rk, msg);
                //rabbitTemplate.waitForConfirmsOrDie(2000);
                boolean result = rabbitTemplate.waitForConfirms(2000);
                if (result) {
                    logger.info("消息发送成功!");
                } else {
                    logger.info("消息发送失败!");
                }
                return null;
            });
        }
    
        @Override
        public void afterPropertiesSet() {
            /**
             * 消息未发送到queue反馈,可以用不存在的routingKey来测试
             * spring.rabbitmq.publisher-returns=true
             */
            rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                logger.warn("返回消息配置:{}", message.getMessageProperties().toString());
                logger.warn("反馈代码:{}", replyCode);
                logger.warn("反馈内容:{}", replyText);
                logger.warn("exchange:{}", exchange);
                logger.warn("routingKey:{}", routingKey);
            });
        }
    }

    2.5、消费者

    package com.abc.demo.general.rabbit.springboot;
    
    import com.rabbitmq.client.Channel;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    @Component
    public class Consumer {
        private static Logger logger = LoggerFactory.getLogger(Consumer.class);
    
        @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
        public void process(Message message, Channel channel) throws IOException {
            try {
                logger.info("接受到消息:" + new String(message.getBody()));
                channel.basicQos(1);//业务处理...
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        }
    }
  • 相关阅读:
    servlet的之前与之后的基本使用
    java HashMap插入重复Key值问题
    ConcurrentHashMap底层实现原理(JDK1.7 & 1.8)
    spring cloud实现热加载
    spring cloud各个组件以及概念的解释和基本使用
    深入理解java 虚拟机 jvm高级特性与最佳实践目录
    【leetcode】1、两数之和
    【Java 基础领域】二维数组创建内存图
    【Java EE领域】com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column 'salary' in 'fi
    【JavaEE领域】com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 'mp.employee' doesn't exi
  • 原文地址:https://www.cnblogs.com/wuyongyin/p/13957556.html
Copyright © 2011-2022 走看看