zoukankan      html  css  js  c++  java
  • RabbitMQ--实战

    一、原生代码小Demo

      pom:

            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.9.0</version>
            </dependency>

      producer:

            // 1. 创建一个 ConnectionFactory 工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.124.8");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            factory.setUsername("lcl");
            factory.setPassword("123456");
            // 2. 创建一个 Connection
            Connection conn = factory.newConnection();
            // 3. 获取一个信道 Channel
            Channel channel = conn.createChannel();
            // 声明一个交换机
            channel.exchangeDeclare("cc", BuiltinExchangeType.DIRECT,true);
            // 4. 通过 Channel 发送消息
            String msg = "cuihuaaaa";
            for (int i = 0; i < 100; i++) {
                channel.basicPublish("cc", "hello", null, msg.getBytes());
                System.out.println(i);
            }
            // 5. 关闭资源
            channel.close();
            conn.close();

      consumer:

            // 1. 创建 ConnetionFactory 连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //factory.setUsername("guest");
            //factory.setPassword("guest");
            factory.setHost("192.168.124.8");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            factory.setUsername("lcl");
            factory.setPassword("123456");
            // 2. 获取 Connection 连接对象
            Connection connection = factory.newConnection();
            // 3. 创建 Channel 信道
            Channel channel = connection.createChannel();
            // 声明交换机
            String exchangeName = "aa";
            channel.exchangeDeclare(exchangeName, "direct", true);
            // 4. 声明队列
            String queueName = channel.queueDeclare().getQueue();
            String routingKey = "msg02";
            channel.queueBind(queueName, exchangeName, routingKey);
            // 5. 消费消息
            while (true){
                boolean autoAck = false;
                String consmerTag = "";
                channel.basicConsume(queueName, autoAck, consmerTag, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        // super.handleDelivery(consumerTag, envelope, properties, body);
                        String routingKey = envelope.getRoutingKey();
                        String contentType = properties.getContentType();
                        System.out.println("消费的路由键:" + routingKey + " 消费的内容类型:" + contentType);
                        long deliveryTag = envelope.getDeliveryTag();
                        // 确认消息
                        channel.basicAck(deliveryTag, false);
                        System.out.println("消费的消息体:");
                        String bodyMsg = new String(body, "UTF-8");
                        System.out.println(bodyMsg);
    
                    }
                });
            }
        }

    二、不同类型交换机的使用及Springboot集成RabbitMQ

      pom:

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

      application.yml:

    spring:
      #配置rabbitMq 服务器
      rabbitmq:
        host: 192.168.124.8
        port: 5672
        username: lcl
        password: 123456
        #虚拟host 可以不设置,使用server默认host
        virtual-host: /

      然后就是具体的实现了。

    (一)直连型交换机Direct Exchange

      直连交换机就是一对一,一个交换机对应一个队列,然后消费者消费指定队列的消息。

      1、创建配置文件

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class DirectRabbitConfig {
        //队列 起名:TestDirectQueue
        @Bean
        public Queue TestDirectQueue() {
            // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
            // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
            // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
            //   return new Queue("TestDirectQueue",true,true,false);
            //一般设置一下队列的持久化就好,其余两个就是默认false
            return new Queue("TestDirectQueue",true);
        }
        //Direct交换机 起名:TestDirectExchange
        @Bean
        DirectExchange TestDirectExchange() {
            //  return new DirectExchange("TestDirectExchange",true,true);
            return new DirectExchange("TestDirectExchange",true,false);
        }
        //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
        @Bean
        Binding bindingDirect() {
            return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
        }
        @Bean
        DirectExchange lonelyDirectExchange() {
            return new DirectExchange("lonelyDirectExchange");
        }
    }

        (1)对于创建队列,源码如下

            durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效

            exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable

            autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。

          一般设置一下队列的持久化就好,其余两个就是默认false

          

        (2) 对于创建交换机,源码如下:

            durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效

            autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。

          

         (3)交换机和队列绑定

      2、Producer

    @RestController
    public class SendMessageApi {
        @Autowired
        RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法
        @GetMapping("/direct")
        public String sendDirectMessage() {
            String messageId = String.valueOf(UUID.randomUUID());
            String messageData = "test message, hello!";
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String,Object> map=new HashMap<>();
            map.put("messageId",messageId);
            map.put("messageData",messageData);
            map.put("createTime",createTime);
            //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
            rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
            return "ok";
        }
    }

      3、consumer

      这里也需要配置相关的ip和用户等信息,我这里是将生产者和消费者放到了同一个项目中,上面已经配置过,这里就不需要在配置了。

    @Component
    @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
    public class DirectConsumer {
        @RabbitHandler
        public void process(Map testMessage) {
            System.out.println("DirectReceiver消费者收到消息  : " + testMessage.toString());
        }
    }

    (二)主题交换机Topic Exchange

      主题交换机与直连交换机的区别就是可以一对多,也就是一个交换机对应多个队列,然后消费者消费指定队列的消息。

      这里模拟分别有三个队列,分别是topic.man、topic.woman、topic.man.lcl,统一都绑定到一个交换机上:

        一个是用全匹配绑定(topic.man),只有发送的routingkey为topic.man时,才会被路由到该队列

        一个是#模糊匹配(topic.#),也就只要routingkey为topic.(后面只有一个单词),则会被路由到队列

        一个是*模糊匹配(topic.*),也就是只要routingkey为topic.开头(后面可以有任意个单词),则会被路由到该队列

      1、创建配置文件

    @Configuration
    public class TopicRabbitConfig {
        //绑定键
        public final static String man = "topic.man";
        public final static String woman = "topic.woman";
        public final static String lcl = "topic.man.lcl";
        @Bean
        public Queue firstQueue() {
            return new Queue(TopicRabbitConfig.man);
        }
        @Bean
        public Queue secondQueue() {
            return new Queue(TopicRabbitConfig.woman);
        }
        @Bean
        public Queue lclQueue() {
            return new Queue(TopicRabbitConfig.lcl);
        }
        @Bean
        TopicExchange exchange() {
            return new TopicExchange("topicExchange");
        }
        //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
        //这样只要是消息携带的路由键是topic.man,才会分发到该队列
        @Bean
        Binding bindingExchangeMessage() {
            return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
        }
        //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
        // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列(topic后只能有任意个词)
        @Bean
        Binding bindingExchangeMessage2() {
            return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
        }
        //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.*
        // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列(topic后只能有一个词)
        @Bean
        Binding bindingExchangeMessage3() {
            return BindingBuilder.bind(lclQueue()).to(exchange()).with("topic.*");
        }
    }

      2、Producer

      发送三个消息,routingkey分别为topic.man、topic.woman、topic.man.lcl

        @GetMapping("/topic1")
        public String sendTopicMessage1() {
            String messageId = String.valueOf(UUID.randomUUID());
            String messageData = "message: M A N ";
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String, Object> manMap = new HashMap<>();
            manMap.put("messageId", messageId);
            manMap.put("messageData", messageData);
            manMap.put("createTime", createTime);
            rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
            return "ok";
        }
    
        @GetMapping("/topic2")
        public String sendTopicMessage2() {
            String messageId = String.valueOf(UUID.randomUUID());
            String messageData = "message: woman is all ";
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String, Object> womanMap = new HashMap<>();
            womanMap.put("messageId", messageId);
            womanMap.put("messageData", messageData);
            womanMap.put("createTime", createTime);
            rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);
            return "ok";
        }
    
        @GetMapping("/topic3")
        public String sendTopicMessage3() {
            String messageId = String.valueOf(UUID.randomUUID());
            String messageData = "message: M A N ";
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String, Object> manMap = new HashMap<>();
            manMap.put("messageId", messageId);
            manMap.put("messageData", messageData);
            manMap.put("createTime", createTime);
            rabbitTemplate.convertAndSend("topicExchange", "topic.man.lcl", manMap);
            return "ok";
        }

      3、Consumer

      创建三个消费者,消费的队列分别为topic.man、topic.woman、topic.man.lcl

    @Component
    @RabbitListener(queues = "topic.man")
    public class TopicManReceiver {
        @RabbitHandler
        public void process(Map testMessage) {
            System.out.println("TopicManReceiver消费者收到消息  : " + testMessage.toString());
        }
    }
    @Component
    @RabbitListener(queues = "topic.woman")
    public class TopicWomanReceiver {
        @RabbitHandler
        public void process(Map testMessage) {
            System.out.println("TopicWomanReceiver消费者收到消息  : " + testMessage.toString());
        }
    }
    @Component
    @RabbitListener(queues = "topic.man.lcl")
    public class TopicLclReceiver {
        @RabbitHandler
        public void process(Map testMessage) {
            System.out.println("TopicLclReceiver消费者收到消息  : " + testMessage.toString());
        }
    }

      4、调用结果

        调用方法一,发送的是topic.man,由于三个队列的routingkey分别为topic.man、topic.#、topic.*,因此三个队列都会有信息路由到。

        调用方法二,发送的是topic.woman,topic.man肯定路由不到,topic.#、topic.*符合路由规则,因此这两个队列都会有信息路由到。

        调用方法三,发送的是topic.man.lcl,只有topic.*被路由到,因此只有这一个队列会有消息。

          

    (三)广播交换机Fanout

      广播交换机的特点就是不需要设置routingkey,直接就可以将交换机的消息路由到绑定的队列上,这里演示同一个交换机绑定多个队列。

      1、创建配置文件

    @Configuration
    public class FanoutRabbitConfig {
        @Bean
        public Queue queueA() {
            return new Queue("fanout.A");
        }
        @Bean
        public Queue queueB() {
            return new Queue("fanout.B");
        }
        @Bean
        public Queue queueC() {
            return new Queue("fanout.C");
        }
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
        @Bean
        Binding bindingExchangeA() {
            return BindingBuilder.bind(queueA()).to(fanoutExchange());
        }
        @Bean
        Binding bindingExchangeB() {
            return BindingBuilder.bind(queueB()).to(fanoutExchange());
        }
        @Bean
        Binding bindingExchangeC() {
            return BindingBuilder.bind(queueC()).to(fanoutExchange());
        }
    }

      2、Producer

        @GetMapping("/fanout")
        public String sendFanoutMessage() {
            String messageId = String.valueOf(UUID.randomUUID());
            String messageData = "message: testFanoutMessage ";
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String, Object> map = new HashMap<>();
            map.put("messageId", messageId);
            map.put("messageData", messageData);
            map.put("createTime", createTime);
            rabbitTemplate.convertAndSend("fanoutExchange", null, map);
            return "ok";
        }

      3、Consumer

      分别创建三个消费者,消费绑定的三个队列

    @Component
    @RabbitListener(queues = "fanout.A")
    public class FanoutReceiverA {
        @RabbitHandler
        public void process(Map testMessage) {
            System.out.println("FanoutReceiverA消费者收到消息  : " +testMessage.toString());
        }
    }
    @Component
    @RabbitListener(queues = "fanout.B")
    public class FanoutReceiverB {
        @RabbitHandler
        public void process(Map testMessage) {
            System.out.println("FanoutReceiverB消费者收到消息  : " +testMessage.toString());
        }
    }
    @Component
    @RabbitListener(queues = "fanout.C")
    public class FanoutReceiverC {
        @RabbitHandler
        public void process(Map testMessage) {
            System.out.println("FanoutReceiverC消费者收到消息  : " +testMessage.toString());
        }
    }

      4、演示结果

        发送一条消息后,三个消费端均可以接收

          

     三、消息的可靠性

      消息的可靠性分为投递的可靠性和接收可靠性,也就是生产者如何保证消息发送成功而不丢失,消费者如何可以正确的消费消息。

    (一)消息投递可靠性分析

       1、方案一:使用消息确认监听

          

         发送消息时,将业务数据和消息数据一起入库,并添加消息监听,如果消息被确认接收成功,则更新数据库中消息的状态。同时使用定时任务轮询那些没有被确认的消息,可以重新发送。

        但是这种场景有个问题,就是在大并发的场景下,会对数据库造成很大的压力,从而造成系统瓶颈,这里可以使用数据库的分库分表从而降低数据库的压力。

      2、方案二:消息延迟投递、做二次确认,回调检查

          

         先将消息落库,然后再发送消息,这里发送了两条消息,一条是真正的业务消息,一条是原消息做延迟检查的消息,一般情况下会延迟2-5分钟发送。

        消费者接收到消息后,进行消费,然后消费完成后给予确认

        CallBack服务通过监听器,监听到消费者的确认之后,对消息做最终的存储

        当接收到延迟投递检查时,callback服务监听到检查的消息,开始进行处理,如果已经接收到消费者消费完成的确认信息,则完成本次检查的消费;如果没有收到消费者消费确认信息,这时callback需要做补偿,其会主动发起RPC通信,让生产者再发送一次。

        这么做的目的,其实就是少一次DB操作,从而减少数据库的压力。

    (二)生产者Confirm消息确认与Return机制

      消息确认是指在生产者投递消息后,如果broker收到消息,则会给生产者一个应答,生产者根据应答来确定消息是否已经被发送到broker

      消息的确认是通过异步的方式来确认的。

      消息确认和普通的发送消息有两点区别:需要开启确认模式和增加监听

      1、原生代码模式

            //4 指定我们的消息投递模式: 消息的确认模式
            channel.confirmSelect();
            //5 发送一条消息
            channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
            System.err.println("------- 发送完成 ----------");
            //6 添加一个确认监听
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.err.println("------- 没有 ACK ----------");
                }
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.err.println("------- 收到 ACK -----------");
                }
            });

      2、SpringBoot代码

       (1)application.yml配置文件

    spring:
      #配置rabbitMq 服务器
      rabbitmq:
        host: 192.168.124.8
        port: 5672
        username: lcl
        password: 123456
        #虚拟host 可以不设置,使用server默认host
        virtual-host: /
    
        #消息确认配置项
        #确认消息已发送到交换机(Exchange) 老版本使用配置项 publisher-confirms: true
        publisher-confirm-type: correlated
        #确认消息已发送到队列(Queue)
        publisher-returns: true

      (2)配置代码

    @Configuration
    public class ConfirmRabbitConfig {
        @Bean
        public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
            rabbitTemplate.setConnectionFactory(connectionFactory);
            //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
            rabbitTemplate.setMandatory(true);
    
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    System.out.println("ConfirmCallback:     "+"相关数据:"+correlationData);
                    System.out.println("ConfirmCallback:     "+"确认情况:"+ack);
                    System.out.println("ConfirmCallback:     "+"原因:"+cause);
                }
            });
    
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    System.out.println("ReturnCallback:     "+"消息:"+message);
                    System.out.println("ReturnCallback:     "+"回应码:"+replyCode);
                    System.out.println("ReturnCallback:     "+"回应信息:"+replyText);
                    System.out.println("ReturnCallback:     "+"交换机:"+exchange);
                    System.out.println("ReturnCallback:     "+"路由键:"+routingKey);
                }
            });
    
            return rabbitTemplate;
        }
    }

        这里强调一下,一定要加上rabbitTemplate.setMandatory(true);或者在配置文件中设置spring.rabbitmq.template.mandatory=true,return消息机制才会生效。

        可以看到上面写了两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback; 那么以上这两种回调函数都是在什么情况会触发呢?

        推送消息存在四种情况:

          (1)消息推送到server,但是在server里找不到交换机

          (2)消息推送到server,找到交换机了,但是没找到队列

          (3)消息推送到sever,交换机和队列啥都没找到

          (4)消息推送成功

        那么我先写几个接口来分别测试和认证下以上4种情况,消息确认触发回调函数的情况

        (1)消息推送到server,但是在server里找不到交换机

          写个测试接口,把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机是没有创建没有配置的):

        @GetMapping("/ack")
        public String TestMessageAck() {
            String messageId = String.valueOf(UUID.randomUUID());
            String messageData = "message: non-existent-exchange test message ";
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String, Object> map = new HashMap<>();
            map.put("messageId", messageId);
            map.put("messageData", messageData);
            map.put("createTime", createTime);
            rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map);
            return "ok";
        }

        输出结果:这种情况触发的是 ConfirmCallback 回调函数。并且由于没有对应的交换机,因此返回false

          

         (2)消息推送到server,找到交换机了,但是没找到队列

          这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,由于之前已经在DirectRabitConfig里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作

        @Bean
        DirectExchange lonelyDirectExchange() {
            return new DirectExchange("lonelyDirectExchange");
        }

          测试方法:

        @GetMapping("/ack2")
        public String TestMessageAck2() {
            String messageId = String.valueOf(UUID.randomUUID());
            String messageData = "message: lonelyDirectExchange test message ";
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String, Object> map = new HashMap<>();
            map.put("messageId", messageId);
            map.put("messageData", messageData);
            map.put("createTime", createTime);
            rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", map);
            return "ok";
        }

          调用结果:

          

         可以看到这种情况,两个函数都被调用了; 这种情况下,消息是推送成功到服务器了的,所以ConfirmCallback对消息确认情况是true; 而在RetrunCallback回调函数的打印参数里面可以看到,消息是推送到了交换机成功了,但是在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE 。 

        (3)消息推送到sever,交换机和队列啥都没找到 这种情况其实和(1)一样。

          结论: (3)这种情况触发的是 ConfirmCallback 回调函数。

        (4)消息推送成功

        那么测试下,按照正常调用之前消息推送的接口就行,就调用下 /direct接口

          

         结论: 这种情况触发的是 ConfirmCallback 回调函数。

    (三)消费端ack与重回队列

      和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。

      消息接收的确认机制主要存在三种模式:

        1、自动确认, 这也是默认的消息确认情况。 AcknowledgeMode.NONE RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。 所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。 一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。

        2、根据情况确认, 这个不做介绍

        3、手动确认 , 这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。

          消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。

          (1)basic.ack用于肯定确认

          (2)basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)

          (3)basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息

      消费者端以上的3个方法都表示消息已经被正确投递,但是basic.ack表示消息已经被正确处理。 而basic.nack,basic.reject表示没有被正确处理。

      着重讲下reject,因为有时候一些场景是需要重新入列的。

        channel.basicReject(deliveryTag, true); 拒绝消费当前消息,如果第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。设置false,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行。 下次不想再消费这条消息了。

        使用拒绝后重新入列这个确认模式要谨慎,因为一般都是出现异常的时候,catch异常再拒绝入列,选择是否重入列。 但是如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压。

      顺便也简单讲讲 nack,这个也是相当于设置不消费某条消息。

        channel.basicNack(deliveryTag, false, true); 第一个参数依然是当前消息到的数据的唯一id; 第二个参数是指是否针对多条消息;如果是true,也就是说一次性针对当前通道的消息的tagID小于当前这条消息的,都拒绝确认。 第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。 同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压。

      1、原生代码

                channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        // 获取传送标签
                        // long deliveryTag = envelope.getDeliveryTag();
                        // 确认消息
                        // channel.basicAck(deliveryTag, false);
                        System.out.println("消费的 Body:" + new String(body, "UTF-8"));
                    }
                });

      2、Springboot代码

      (1)添加一个手动确认类

        如果成功,返回肯定确认,如果catch到异常,返回否定确认。

    @Component
    public class MyAckReceiver implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                //因为传递消息的时候用的map传递,所以将Map从Message内取出需要做些处理
                String msg = message.toString();
                String[] msgArray = msg.split("'");//可以点进Message里面看源码,单引号直接的数据就是我们的map消息数据
                Map<String, String> msgMap = mapStringToMap(msgArray[1].trim(),3);
                String messageId=msgMap.get("messageId");
                String messageData=msgMap.get("messageData");
                String createTime=msgMap.get("createTime");
                System.out.println("  MyAckReceiver  messageId:"+messageId+"  messageData:"+messageData+"  createTime:"+createTime);
                System.out.println("消费的主题消息来自:"+message.getMessageProperties().getConsumerQueue());
                channel.basicAck(deliveryTag, true); //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
    //            channel.basicReject(deliveryTag, true);//第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
                //根据不同的队列走不同的消费逻辑
                /*if ("TestDirectQueue".equals(message.getMessageProperties().getConsumerQueue())){
                    System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
                    System.out.println("消息成功消费到  messageId:"+messageId+"  messageData:"+messageData+"  createTime:"+createTime);
                    System.out.println("执行TestDirectQueue中的消息的业务处理流程......");
    
                }
                if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue())){
                    System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
                    System.out.println("消息成功消费到  messageId:"+messageId+"  messageData:"+messageData+"  createTime:"+createTime);
                    System.out.println("执行fanout.A中的消息的业务处理流程......");
    
                }*/
            } catch (Exception e) {
                channel.basicReject(deliveryTag, false);
                e.printStackTrace();
            }
        }
        //{key=value,key=value,key=value} 格式转换成map
        private Map<String, String> mapStringToMap(String str,int entryNum ) {
            str = str.substring(1, str.length() - 1);
            String[] strs = str.split(",",entryNum);
            Map<String, String> map = new HashMap<String, String>();
            for (String string : strs) {
                String key = string.split("=")[0].trim();
                String value = string.split("=")[1];
                map.put(key, value);
            }
            return map;
        }

      (2)添加配置代码

    @Configuration
    public class MessageAckListenerConfig {
        @Autowired
        private CachingConnectionFactory connectionFactory;
        @Autowired
        private MyAckReceiver myAckReceiver;//消息接收处理类
        @Bean
        public SimpleMessageListenerContainer simpleMessageListenerContainer() {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            container.setConcurrentConsumers(1);
            container.setMaxConcurrentConsumers(1);
            container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
            //设置一个队列
            container.setQueueNames("TestDirectQueue");
            //如果同时设置多个如下: 前提是队列都是必须已经创建存在的
            //  container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");
            //另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues
            //container.setQueues(new Queue("TestDirectQueue",true));
            //container.addQueues(new Queue("TestDirectQueue2",true));
            //container.addQueues(new Queue("TestDirectQueue3",true));
            container.setMessageListener(myAckReceiver);
            return container;
        }
    }

        调用接口验证

          

       (3)说明

        上面的代码演示的是只有一个队列,但是有的业务场景需要设置多个队列手动确认,那么就可以参照注释掉的代码,在确认时,根据不同的队列名进行手动确认,同时在配置时,设置多个队列

    四、消费端消息限流

      消费端限流实际上就是消息在消费端的平稳投递消费,而不会导致一时间大量的消息打入到消费端,给消费端造成压力。

      RabbitMQ 提供了一种 qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consumer 或者 channel 设置 Qos 的值)未被确认前,不进行消费新的消息。

      在原生代码中,对channel进行设置

            /**
             * 限流设置:  prefetchSize:每条消息大小的设置,0是无限制
             * prefetchCount:标识每次推送多少条消息 一般是一条
             * global:false标识channel级别的  true:标识消费者级别的
             */
            channel.basicQos(0,1,false);

      在Springboot中就更简单了,只需要对配置文件做调整即可

    spring:
      #配置rabbitMq 服务器
      rabbitmq:
        listener:
          simple:
            #消费者最小数量
            concurrency: 1
            #消费之最大数量
            max-concurrency: 10
            #在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
            prefetch: 1
            # 是否手动确认
            #acknowledge-mode: manual

    五、特殊队列(TTL队列与死信队列)

    (一)TTL队列

      TTL 是 Time To Live 的所写,也就是生存时间,RabbitMQ 支持消息的过期时间,在消息发送时可以进行指定。

      RabbitMQ 支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动地清除。

      生产者在发送时设置消息的有效期

    Builder bd = new AMQP.BasicProperties().builder();
    bd.deliveryMode(2);//持久化
    bd.expiration("100000");//设置消息有效期100秒钟
    BasicProperties pros = bd.build();
    String message = "测试ttl消息";
    channel.basicPublish(EXCHANGE_NAME, "error", true,false, pros, message.getBytes());

      消费者设置队列中消息的有效期

    //设置队列上所有的消息的有效期,单位为毫秒
    Map<String, Object> argss = new HashMap<String , Object>();
    arguments.put("x-message-ttl " , 5000);//5秒钟
    channel.queueDeclare(queueName , durable , exclusive , autoDelete , arguments) ;

    (二)死信队列

      死信队列 DLX, Dead-Letter-Exchange,利用 DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新 pulish 到另一个 Exchange,这个 Exchange 就是 DLX。

      当某个队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的 Exchange 上去,进而被路由到另一个队列。

      死信队列也叫延时队列。基于TTL模式的延时队列会涉及到2个交换机、2个路由键、2个队列,如下图所示:

        (1)生产者将消息(msg)和路由键(routekey)发送指定的死信交换机(DelayExchange)上

        (2)死信交换机(delayexchange)根据路由键(routekey1)找到绑定自己的死信队列(DelayQueue)并把消息给它

        (3)消息(msg)到期死亡变成死信转发给死信接收交换机(ReceiveExchange)

        (4)死信接收交换机(ReceiveExchange)根据路由键(routekey2)找到绑定自己的死信接收队列(ReceiveQueue)并把消息给它

        (5)死信接收队列(ReceiveQueue)再把消息发送给监听它的消费者(customer)

          

       消息变成死信队列的几种情况:

        1、消息被拒绝(basic.reject / basic.nack)并且 requeue = false 

        2、消息 TTL 过期

        3、队列达到最大长度

      DLX 也是一个正常的 Exchange,和一般的 Exchange 没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。

       代码样例:

        1、配置队列绑定

          分别创建死信队列交换机、接收对接交换机、死信队列、接收队列以及绑定关系

    @Configuration
    public class DelayRabbitConfig {
    
        /**
         * 死信交换机
         * @return
         */
        @Bean
        public DirectExchange delayExchange(){
            return new DirectExchange("delay_exchange");
        }
    
        /**
         * 死信队列
         * @return
         */
        @Bean
        public Queue delayQueue(){
            Map<String,Object> map = new HashMap<>(16);
            map.put("x-dead-letter-exchange","receive_exchange");
            map.put("x-dead-letter-routing-key", "receive_key");
            return new Queue("delay_queue",true,false,false,map);
        }
    
        /**
         * 给死信队列绑定交换机
         * @return
         */
        @Bean
        public Binding delayBinding(Queue delayQueue, DirectExchange delayExchange){
            return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay_key");
        }
    
        /**
         * 死信接收交换机
         * @return
         */
        @Bean
        public DirectExchange receiveExchange(){
            return new DirectExchange("receive_exchange");
        }
    
        /**
         * 死信接收队列
         * @return
         */
        @Bean
        public Queue receiveQueue(){
            return new Queue("receive_queue");
        }
    
        /**
         * 死信交换机绑定消费队列
         * @return
         */
        @Bean
        public Binding receiveBinding(Queue receiveQueue,DirectExchange receiveExchange){
            return BindingBuilder.bind(receiveQueue).to(receiveExchange).with("receive_key");
        }

        上面的设置死信队列时的key是固定的,如果不清楚,可以参看以下控制台。

          

         2、Producer

        @GetMapping("/delay")
        public String sendDelayMessage() {
            //这里的消息可以是任意对象,无需额外配置,直接传即可
            String messageData = "sendDelayMessage~~~~~~~~~ ";
            log.info("===============延时队列生产消息====================");
            log.info("发送时间:{},发送内容:{}", LocalDateTime.now(), messageData);
            this.rabbitTemplate.convertAndSend(
                    "delay_exchange",
                    "delay_key",
                    messageData,
                    message -> {
                        //注意这里时间要是字符串形式
                        message.getMessageProperties().setExpiration("60000");
                        return message;
                    }
            );
            log.info("{}ms后执行", 60000);
            return "OK~";
        }

        3、consumer

    @Component
    @Slf4j
    public class DelayConsumer {
    
        @RabbitListener(queues = "receive_queue")
        public void cfgUserReceiveDealy(List<Integer> list, Message message, Channel channel) throws IOException {
            log.info("===============接收队列接收消息====================");
            log.info("接收时间:{},接受内容:{}", LocalDateTime.now(), list.toString());
            //通知 MQ 消息已被接收,可以ACK(从队列中删除)了
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            try {
                log.error("============dosomething.....!==============");
            } catch (Exception e) {
                log.error("============消费失败,尝试消息补发再次消费!==============");
                log.error(e.getMessage());
                /**
                 * basicRecover方法是进行补发操作,
                 * 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
                 * 设置为false是只补发给当前的consumer
                 */
                channel.basicRecover(false);
            }
        }
    }

        4、验证

          

    ------------------------------------------------------------------
    -----------------------------------------------------------
    ---------------------------------------------
    朦胧的夜 留笔~~
  • 相关阅读:
    SAP基础:定位点运算
    WDA基础十七:ALV不同行显示不同下拉
    WDA基础十六:ALV的颜色
    WEB UI基础八:链接跳转到标准的工单界面
    WDA基础十五:POPUP WINDOW
    CRM创建BP(END USER)
    CRM 员工创建并分配用户
    STRANS一:简单的XML转换
    WEB UI 上传URL附件(使用方法备份)
    FPM四:用OVP做查询跳转到明细
  • 原文地址:https://www.cnblogs.com/liconglong/p/15150370.html
Copyright © 2011-2022 走看看