zoukankan      html  css  js  c++  java
  • RabbitMQ学习总结(3)-集成SpringBoot

    1. pom.xml引用依赖

      SpringBoot版本可以自由选择,我使用的是2.1.6.RELEASE,使用starter-web是因为要使用Spring的相关注解,所以要同时加上。

        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!-- RabbitMQ引用 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
        </dependencies>

      starter-amqp是RabbitMQ的主要依赖,这个包内都依赖了以下内容:

    [INFO] +- org.springframework.boot:spring-boot-starter-amqp:jar:2.1.6.RELEASE:compile
    [INFO] |  +- org.springframework:spring-messaging:jar:5.1.8.RELEASE:compile
    [INFO] |  - org.springframework.amqp:spring-rabbit:jar:2.1.7.RELEASE:compile
    [INFO] |     +- org.springframework.amqp:spring-amqp:jar:2.1.7.RELEASE:compile
    [INFO] |     |  - org.springframework.retry:spring-retry:jar:1.2.4.RELEASE:compile
    [INFO] |     +- com.rabbitmq:amqp-client:jar:5.4.3:compile
    [INFO] |     - org.springframework:spring-tx:jar:5.1.8.RELEASE:compile

      因为同时依赖了spring-rabbit.jar和amqp-client.jar,所以可以使用Spring方式继承,可以使用原始API调用MQ。下面我只讲继承Spring的方法。

    2. 消息生产者代码

    2.1 基础配置类(这里配置的交换机、队列、绑定关系,如果RabbitMQ中不存在会被创建。如果已存在但是和你配置的不一样会再启动时报错)

    /**
     * RabbitMQ的配置类
     */
    @Configuration
    public class RabbitMqConfig {
    
        /**
         * 创建RabbitMQ的连接工厂
         * @return ConnectionFactory
         */
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory  factory = new CachingConnectionFactory();
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            // 虚拟地址,类型与MySQL的库,是相互独立的,可以在MQ管理后台创建
            factory.setVirtualHost("myhosts");
            //是否开启消息确认机制:消息确认、失败回调
            factory.setPublisherConfirms(true);
            return factory;
        }
    
        /**
         * 创建RabbitTemplate,用于访问RabbitMQ
         * 如果有多个回调业务,要把此方法设置为非单例模式,在使用@PostConstruct在调用时注入ConfirmCallback方法
         * @return RabbitTemplate
         */
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate template = new RabbitTemplate();
            template.setConnectionFactory(connectionFactory);
            // 1. 消息发送确认,这块处理的是消息到交换机之间的逻辑
            template.setConfirmCallback(new MyConfirmCallback());
            // 2. 开启失败回调,处理的是交换机到队列之间的逻辑
            template.setMandatory(true);
            template.setReturnCallback(new MyReturnCallback());
            // 消息转换器,可以统一处理消息格式
            template.setMessageConverter(new MyMessageConverter());
            return template;
        }
    
        /**
         * 默认交换机
         * @return DirectExchange
         */
        @Bean
        public DirectExchange defaultExchange() {
            Map<String, Object> map = new HashMap<>();
            // 声明备用交换机,只能在声明交换机时,才能声明备用交换机
            // 如果被交换机没有被路由到,就会启用备用交换机
            // 备用交换机需要提前创建
            map.put("alternate-exchange", "exchangeTest");
            // 参数1:交换机名称   参数2:是否持久化  参数3:是否自删除  参数4: 配置参数
            return new DirectExchange("directExchange", false, false, map);
        }
    
        /**
         * 默认队列
         * @return
         */
        @Bean
        public Queue queue() {
            // 参数:队列名称,是否持久化,是否排他队列,是否自动删除,设置参数
            return new Queue("testQueue", true, false, false, null);
        }
    
        /**
         * 绑定关系
         * @return
         */
        @Bean
        public Binding binding() {
            //绑定一个队列  to: 绑定到哪个交换机上面 with:绑定的路由建(routingKey)
            return BindingBuilder.bind(queue()).to(defaultExchange()).with("direct.key");
        }
    
    
    }

    2.2 消息发送确认

    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Component;
    
    /**
     * 消息发送回调方法(用于确保消息是否发送到交换机)
     */
    @Component
    public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {
    
        /**
         * 消息发送确认
         * @param correlationData SpringBoot提供的业务标识对象(建议在发送消息时,提供这个对象,方便失败时处理)
         * @param ack   有没有成功发送到MQ
         * @param cause 失败原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            // 按照业务处理具体业务逻辑,比如错误消息入库, 重新发送等。
            System.out.println("Confirm=====  data=" + correlationData.toString() + " ,ack=" + ack + ",  cause="+cause);
        }
    
    }

    2.3 消息失败回调

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Component;
    
    /**
     * 失败回调(出现这个问题是消息没用从交换机进入到队列里)*/
    @Component
    public class MyReturnCallback implements RabbitTemplate.ReturnCallback {
    
        /**
         * 返回消息:失败回调出现了的都是比较严重的问题,所以返回信息比较多(失败原因可能是路由键不存在,通道未绑定等等)
         * @param message  发送消息 "hello" + 发送消息的配置
         * @param replyCode 状态码:成功是200
         * @param replyText 失败的信息
         * @param exchange  交换机
         * @param routingKey 路由键
         */
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            StringBuffer buffer = new StringBuffer();
            buffer.append("returnedMessage=======" + new String(message.getBody()));
            buffer.append(", replyCode=" + replyCode);
            buffer.append(", replyText=" + replyText);
            buffer.append(", exchange=" + exchange);
            buffer.append(",routingKey=" + routingKey);
            System.out.println(buffer.toString());
        }
    }

    2.5 消息格式化

      因为RabbitMQ默认的消息格式是byte类型,Spring容器在消费者消费消息时,会自动把byte转成String类型,如果用String类型接收消息,可以直接输出。但如果是Message类型接收消息,就会显示乱码。所以要统一封装成Message。

    import com.alibaba.fastjson.JSON;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.support.converter.MessageConversionException;
    import org.springframework.amqp.support.converter.MessageConverter;
    
    /**
     * 消息格式化
     */
    public class MyMessageConverter implements MessageConverter {
    
        /**
         * 发送消息,要重写的方法,把Object转成Message
         * @param o   消息内容
         * @param messageProperties 消息参数
         * @return Message
         * @throws MessageConversionException
         */
        @Override
        public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
            // 默认格式SpringBoot会转换一次,所以直接接受String输出的是字节码。设置text格式后,转换时会默认用UTF-8转码数据
            messageProperties.setContentType("text/xml");
            messageProperties.setContentEncoding("UTF-8");
            byte[] bytes;
            if (o instanceof String){
                bytes = ((String) o).getBytes();
            } else {
                bytes = JSON.toJSONBytes(o);
            }
            Message message = new Message(bytes, messageProperties);
            return message;
        }
    
        /**
         * 接收消息,要重写的方法,可以把Message转成成任意类型
         * @param message 消息内容
         * @return Object
         * @throws MessageConversionException
         */
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            return null;
        }
    }

    2.4 发送者代码

    @Component
    public class MessageProducer {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        public void testSend() {
            // 业务标识,发送失败时才有用。
            CorrelationData data = new CorrelationData("order_no_1111");
            // 参数1:交换机名字 参数2:路由键  参数3:消息内容
            rabbitTemplate.convertAndSend("directExchange", "direct.key", "hello", data);
        }
    
    }

      使用这个类测试发送消息,消息发送成功的日志:Confirm=====  data=CorrelationData [id=order_no_1111] ,ack=true,  cause=null

      把消息生产者的routingKey修改一个不存在,再次测试,就可以看到消息失败回调的日志。

      消息失败回调的日志:returnedMessage=======hello, replyCode=312, replyText=NO_ROUTE,  exchange=directExchange, routingKey=direct.key3

      不论是消息发送确认失败,还是消息失败回调,都需要按照自己的业务逻辑把信息记录下来,再准备补偿机制处理。

    3. 消息消费者代码

    3.1 基础配置类

      基础配置类与消息生产者代码是一样,只是RabbitTemplate方法中,不再需要消息确认、失败回调,删除即可。

    3.2 监听器的容器工厂

      这个配置类是可以直接定义消费者的一些行为。在@RabbitListener注解上使用。

        /**
         * 监听消费者行为
         * @return SimpleRabbitListenerContainerFactory
         */
        @Bean
        public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(){
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory());
            // 确认方式: NONE-不确认,MANUAL-手动确认, AUTO-自动确认
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            // 消息预期数量
            factory.setPrefetchCount(1);
            return factory;
        }

    3.3 消费者的消息监听器

      使用@RabbitListener注解指定要监听的队列。如果有2个监听器监听一个队列,RabbitMQ默认会以轮训的方式把消息分发给两个监听器。(比如消费者的服务,部署了2台)

      关于接受的消息格式问题,Listener支持两种2格式:String和Message

      String类型:因为RabbitMQ默认的消息格式是byte,但使用String类型可以直接消息,是因为Spring给用户做了一次new String(byte[])的操作。

      Message类型:如果直接使用Message类型接收参数,会出现数字乱码,因为消息内容已经被Spring把message转成String处理过一次。所以在发送消息时,使用MessageConverter格式化之后,message.getBody()得到的byte[] 在转成String类型即可。

    /**
     * 消息监听器
     *
     * 预取数量:(假设有两个消费者,100条消息,队列会按照轮训的规则,把所有消息轮训发给2个消费者,如果一个消费快,一个消费慢,就造成了消费快的服务一直很闲,消费慢的进程压力过大。)
     *            现象就是队列已经清零了,消费慢的线程还在慢慢的消费剩余的消息
     *
     * @author he.zhang
     * @date 2020/3/29 23:26
     */
    @Component
    public class MessageListener {
    
        /**
         * 消费者,监听testQueue队列
         * @param message 消息内容,此处接收的是Message对象,需要把body转码。也可以使用MessageConverter 来自动统一处理
         * @throws Exception
         */
        @RabbitListener(queues = "testQueue", containerFactory = "simpleRabbitListenerContainerFactory")
        public void get(Message message, Channel channel) throws Exception {
            System.out.println("消费者1:" + new String(message.getBody(), "UTF-8"));
            // 消费预取数量,一次性拉取多少条数据。最多同时接收多少条数据。必须手动确认,不能自动确认。
            // 可以和批量确认一起使用
            channel.basicQos(10);
    
            // 假设消息是下订单的内容,下单成功的消息,手动确认
            if (placeOrder()){
                // 手动确认配置 参数1:消息唯一标识  参数2:是否批量确认
                // 建议开通批量确认,唯一标识是MQ自动生成的,传最后一次确认的ID,会把之前的全都确认
                // 开启批量确认需要提供个判断条件,并且要做好幂等性处理。(因为在未确认之前,连接中断的话,会造成重复消费的问题)
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
            // 下单失败的消息,采用消息退回策略
            else {
                // 批量消息退回
                // 参数1:消息标识
                // 参数2:是否批量退回,如果想要单条退回就用false
                // 参数3:是否回到消息队列,如果要废弃消息使用false,如果退回的可能会有重复消费可能,需要同时开通其他消费者处理。
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                // 单条消息退回,最早按照AMQP定义的接口,现在不经常用了
    //            channel.basicReject();
            }
        }/**
         * 消费者2,监听testQueue队列,如果两个监听器同时监听一个通道,两个线程轮训消费消息
         * @param message 消息内容,String类型,直接输出
         * @throws Exception
         */
        @RabbitListener(queues = "testQueue")
        public void get2(String message) throws Exception {
            System.out.println("消费者2:" + message);
        }
    
    }
  • 相关阅读:
    linux命令整理
    各种提权姿势总结
    常用端口信息说明和利用
    近年来爆发的CVE漏洞编号
    一个优秀的SSH远程终端工具
    python-读写文件的方式
    kali安装ssh服务
    一套实用的渗透测试岗位面试题
    使用 python快速搭建http服务
    asciinema使用
  • 原文地址:https://www.cnblogs.com/huanshilang/p/12701167.html
Copyright © 2011-2022 走看看