zoukankan      html  css  js  c++  java
  • 消息队列 RabbitMq(6)高级特性

    一.消息的可靠投递

    在使用RabbitMq的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败的场景。RabbitMQ为我们提供了两种方式用来控制消息的投递可靠性

    rabbitMQ 整个消息投递过程为:

     producer ->  rabbitMQ broker -> exchange -> queue ->consumer

      1.confirm 确认模式

        消息从producer 到exchange 会返回一个 confirmCallback

      2.return 退回模式

        消息从exchange到queue投递失败则会返回一个returnCallbak

    Confirm确认模式:

    1.在配置文件中开启确认模式 publisher-confimrs :true

    spring:
      rabbitmq:
        host: 10.211.55.4
        virtual-host: local
        port: 5672
        username: admin
        password: admin
        publisher-confirms: true

    2.在rabbitTemplate定义ConfirmCallBack回调函数

    @RequestMapping("/producer")
        public void producer(){
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                /**
                 * ack exchange交换机是否收到了消息
                 * cause 失败原因
                 */
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    if(ack){
                        System.out.println("confirm方法 成功了");
                    }else{
                        System.out.println("confirm方法 失败了: " + cause);
                    }
                }
            });
    
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.test","spring boot rabbit mq");
        }

    Return退回模式:

    1.开启退回模式

    spring:
      rabbitmq:
        host: 10.211.55.4
        virtual-host: local
        port: 5672
        username: admin
        password: admin
        publisher-returns: true

    2.设置returnCallBack

    @RequestMapping("/return")
        public void returncallback(){
            //设置交换机处理失败消息的模式
            rabbitTemplate.setMandatory(true);
    
            //设置return call back
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                /**
                 *
                 * @param message   消息对象
                 * @param replyCode 错误码
                 * @param replyText 错误信息
                 * @param exchange  交换机
                 * @param routingKey 路由键
                 */
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    System.out.println("return 执行了....");
    
                    System.out.println(message);
                    System.out.println(replyCode);
                    System.out.println(replyText);
                    System.out.println(exchange);
                    System.out.println(routingKey);
    
                    //处理
                }
            });
    
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"zxcasd","return call back");
        }

    二、Consumer ACK 确认机制

    表示消费端收到消息后到确认方式

    有三种确认方式:

    1.自动确认 acknowledge="none" 默认

    2.手动确认 acknowledge="manual"

    3.根据异常情况确认: acknowledge="auto"

    1.开启手动确认

    spring:
      rabbitmq:
        host: 10.211.55.4
        virtual-host: local
        port: 5672
        username: admin
        password: admin
        publisher-returns: true
        listener:
          simple:
            acknowledge-mode: manual

    2.成功调用channel.basicAck   异常调用channel.basicNack

    @Component
    public class RabbitMQListener implements ChannelAwareBatchMessageListener {
    
        @Override
        @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try{
                System.out.println(new String(message.getBody()));
                int a = 1/0;
                //手动确认
                channel.basicAck(deliveryTag,true);
            }catch (Exception ex){
                //发生异常 拒绝确认
                //第三个参数,重回队列
                channel.basicNack(deliveryTag,true,true);
            }
        }
    }

    三、消费端 限流

    1.确保ack机制为手动确认

    2.设置prefetch=1 每次从rabbitmq取几条,ack确认后再取下一条

    server:
      port: 9999
    spring:
      rabbitmq:
        host: 10.211.55.4
        virtual-host: local
        port: 5672
        username: admin
        password: admin
        publisher-returns: true
        listener:
          simple:
            acknowledge-mode: manual
          direct:
            prefetch: 1

    四、TTL

    Time To Live 存活时间/过期时间,当消息到达一定时间后,还没有被消费,会被自动清除。

    RabbitMQ可以对消息设置过期时间,也可以对整个队列设置过期时间。

    1.queue 队列设置过期时间

    @Configuration
    public class RabbitMQConfig {
        public static final String EXCHANGE_NAME = "exchange_ttl";
        public static final String QUEUE_NAME = "queue_ttl";
        public static final String Routing_Key = "ttl.#";
    
        // 1.交换机
        @Bean("bootExchange")
        public Exchange bootExchange() {
            return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
        }
    
    
        //2.Queue队列
        @Bean("bootQueue")
        public Queue bootQueue() {
            return QueueBuilder.durable(QUEUE_NAME).withArgument("x-message-ttl", 5000).build();
        }
    
        //3.绑定交换机和队列
        @Bean
        public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(Routing_Key).noargs();
        }
    }

    2.消息设置过期时间

    @RequestMapping("/producer")
        public void producer(){
    
            MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setExpiration("5000"); // 消息过期时间
                    return message;
                }
            };
    
            for(int i = 0;i<10;i++){
                if(i>5){
                    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ttl.test","spring boot rabbit mq"+i,messagePostProcessor);
                }else{
                    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ttl.test","spring boot rabbit mq"+i);
                }
            }
        }

    五、死信队列

    DLX、Dead Letter Exchange (死信交换机),当消息成为Dead Messag后,可以被重新发送到另一台交换机中。

    消息成为死信到三种情况:

    1.队列消息长度达到限制,

    2.消费者拒接消费信息,basicNack/basicReject 并且不把消息重新放入原目标队列,参数requeue=false。

    3.原队列存在消息过期设置,消息到达超时时间未被消费

    设定参数 x-dead-letter-exchange 和 x-dead-letter-routing-key

    @Configuration
    public class RabbitMQConfig {
    
        public static final String EXCHANGE_NAME = "test_exchange_dlx";
        public static final String QUEUE_NAME = "test_queue_dlx";
        public static final String Routing_Key = "test.dlx.#";
    
        public static final String DEAD_EXCHANGE = "exchange_dlx";
        public static final String DEAD_QUEUE = "queue_dlx";
        public static final String DEAD_ROUTINGKEY = "dlx.#";
    
    
        //声明正常的交换机(test_exchange_dlx)和队列(test_queue_dlx)
        //交换机
        @Bean("testExchangeDlx")
        public Exchange testExchangeDlx() {
            return new TopicExchange(EXCHANGE_NAME);
        }
    
        //声明队列
        @Bean("testQueueDlx")
        public Queue testQueueDlx() {
            Map<String, Object> args = new HashMap<>();
            //绑定  死信队列Exchange
            args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
            //绑定 死信队列抛出异常重定向队列的routingKey
            args.put("x-dead-letter-routing-key", "dlx.hehe");
            //过期时间
            args.put("x-message-ttl", 5000);
            //最大长度
            args.put("x-max-length", 10);
            return new Queue(QUEUE_NAME,true,false,false,args);
        }
    
        //绑定交换机和队列
        @Bean
        public Binding bindQueueExchange(@Qualifier("testQueueDlx") Queue queue, @Qualifier("testExchangeDlx") Exchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(Routing_Key).noargs();
        }
    
        //声明死信的交换机和队列
        // 死信交换机和普通交换机没有什么区别
        @Bean("deadExchange")
        public Exchange deadExchange() {
            return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).durable(true).build();
        }
        @Bean("deadQueue")
        public Queue deadQueue() {
            return QueueBuilder.durable(DEAD_QUEUE).build();
        }
    
        //绑定交换机和队列
        @Bean
        public Binding deadBindQueueExchange(@Qualifier("deadQueue") Queue queue, @Qualifier("deadExchange") Exchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(DEAD_ROUTINGKEY).noargs();
        }
    
    }

    1、创建交换机,队列

    2、创建死信交换机,队列

    3.在正常队列中绑定死信交换机 即可 超过指定条数 或者超过时间 就会自动进入 deadQueue队列

    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try{
                System.out.println(new String(message.getBody()));
                //手动确认
                int i = 1/0; //
    //            Thread.currentThread().sleep(2000);
                channel.basicAck(deliveryTag,true);
            }catch (Exception ex){
                //发生异常 拒绝确认
                //第三个参数,重回队列 requeue = false 
                channel.basicNack(deliveryTag,true,false);
            }
        }

    经测试,手动拒绝 requeue=false直接进 死信队列,  超时后 未处理的条数会进入死信队列, 超过10条的直接进入死信队列。

    六、延迟队列

    消息进入队列不会立即消费,只有到达指定时间后才会消费。

    需求 :

      1.下单后,30分钟未支付,取消订单,回滚库存。

      2.新用户注册成功7天后,发送短信问候。

    使用TTL+死信队列 实现延迟队列的效果,

    队列过期时间设置为30分钟,30分钟后失效到 死信队列中, 消费dlx 死信队列 即可

    七、日志与监控

    rabbitMQ 日志文件存放路径: /var/log/rabbitmq/rabbit@主机名.log

    八、消息追踪

    在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于rabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了链接,而它们与RabbitMQ又采用了不同的确认机制。也有可能是因为交换机与队列之间不同的转发策略,甚至是交换机没有雨任何队列进行绑定,这个时候就需要一个较好的机制跟踪记录消息的投递过程,协助开发人员进行问题的定位。

    在RabbitMQ中可以使用Firehose 和rabbitmq_tracing插件功能来实现消息追踪。

     

    https://blog.csdn.net/u013256816/article/details/76039201

     

    开发 测试环境 调错用, 生产环境会影响性能

     

    九、消息可靠性保障-消息补偿

     

     

    1.首先入库,然后去发送到队列,消费后入库。 完整流程

    2.入库,发送消息失败了,过了10分钟发送延迟消息,监听延迟消息,将消息写入数据库,写一个服务定时检查是否成功,如果没成功 重新发送消息。

     

    十、消息幂等性保障-乐观锁机制

    不管请求一次还是多次请求结果应该是一样的。在MQ中,消费多条相同的消息,得到与消费一次相同的结果。

     

    如果 consumer 挂了, producer 又发了一次  就会队列中 存在两次 消费500元的通知,

    可以在指定版本号的方式,防止sql执行,只更新version为1的 sql

     

    十一、集群搭建

    多服务器用ip区分

    单机多实例用端口区分

     

     

     

     

     

  • 相关阅读:
    nginx配置
    day5 业务开发中较有用代码
    day4 Vue基础
    npm vue的一些命令
    day3 ES6基础
    python_矩阵的加法和乘法计算(包括矩阵的动态输入,纯列表实现不引入其他模块)
    python_利用元组实现剪刀石头布
    python_整型与IP地址的转换
    python_判断标识符的合法性
    python_生成随机数与列表排序
  • 原文地址:https://www.cnblogs.com/baidawei/p/13306124.html
Copyright © 2011-2022 走看看