zoukankan      html  css  js  c++  java
  • 消息队列 RabbitMq(6)高级特性

    一.消息的可靠投递

    在使用RabbitMq的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败的场景。RabbitMQ为我们提供了两种方式用来控制消息的投递可靠性

    rabbitMQ 整个消息投递过程为:

     producer ->  rabbitMQ broker -> exchange -> queue ->consumer

      1.confirm 确认模式

        消息从producer 到exchange 会返回一个 confirmCallback

      2.return 退回模式

        消息从exchange到queue投递失败则会返回一个returnCallbak

    Confirm确认模式:

    1.在配置文件中开启确认模式 publisher-confimrs :true

    spring:
      rabbitmq:
        host: 10.211.55.4
        virtual-host: local
        port: 5672
        username: admin
        password: admin
        publisher-confirms: true

    2.在rabbitTemplate定义ConfirmCallBack回调函数

    @RequestMapping("/producer")
        public void producer(){
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                /**
                 * ack exchange交换机是否收到了消息
                 * cause 失败原因
                 */
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    if(ack){
                        System.out.println("confirm方法 成功了");
                    }else{
                        System.out.println("confirm方法 失败了: " + cause);
                    }
                }
            });
    
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.test","spring boot rabbit mq");
        }

    Return退回模式:

    1.开启退回模式

    spring:
      rabbitmq:
        host: 10.211.55.4
        virtual-host: local
        port: 5672
        username: admin
        password: admin
        publisher-returns: true

    2.设置returnCallBack

    @RequestMapping("/return")
        public void returncallback(){
            //设置交换机处理失败消息的模式
            rabbitTemplate.setMandatory(true);
    
            //设置return call back
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                /**
                 *
                 * @param message   消息对象
                 * @param replyCode 错误码
                 * @param replyText 错误信息
                 * @param exchange  交换机
                 * @param routingKey 路由键
                 */
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    System.out.println("return 执行了....");
    
                    System.out.println(message);
                    System.out.println(replyCode);
                    System.out.println(replyText);
                    System.out.println(exchange);
                    System.out.println(routingKey);
    
                    //处理
                }
            });
    
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"zxcasd","return call back");
        }

    二、Consumer ACK 确认机制

    表示消费端收到消息后到确认方式

    有三种确认方式:

    1.自动确认 acknowledge="none" 默认

    2.手动确认 acknowledge="manual"

    3.根据异常情况确认: acknowledge="auto"

    1.开启手动确认

    spring:
      rabbitmq:
        host: 10.211.55.4
        virtual-host: local
        port: 5672
        username: admin
        password: admin
        publisher-returns: true
        listener:
          simple:
            acknowledge-mode: manual

    2.成功调用channel.basicAck   异常调用channel.basicNack

    @Component
    public class RabbitMQListener implements ChannelAwareBatchMessageListener {
    
        @Override
        @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try{
                System.out.println(new String(message.getBody()));
                int a = 1/0;
                //手动确认
                channel.basicAck(deliveryTag,true);
            }catch (Exception ex){
                //发生异常 拒绝确认
                //第三个参数,重回队列
                channel.basicNack(deliveryTag,true,true);
            }
        }
    }

    三、消费端 限流

    1.确保ack机制为手动确认

    2.设置prefetch=1 每次从rabbitmq取几条,ack确认后再取下一条

    server:
      port: 9999
    spring:
      rabbitmq:
        host: 10.211.55.4
        virtual-host: local
        port: 5672
        username: admin
        password: admin
        publisher-returns: true
        listener:
          simple:
            acknowledge-mode: manual
          direct:
            prefetch: 1

    四、TTL

    Time To Live 存活时间/过期时间,当消息到达一定时间后,还没有被消费,会被自动清除。

    RabbitMQ可以对消息设置过期时间,也可以对整个队列设置过期时间。

    1.queue 队列设置过期时间

    @Configuration
    public class RabbitMQConfig {
        public static final String EXCHANGE_NAME = "exchange_ttl";
        public static final String QUEUE_NAME = "queue_ttl";
        public static final String Routing_Key = "ttl.#";
    
        // 1.交换机
        @Bean("bootExchange")
        public Exchange bootExchange() {
            return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
        }
    
    
        //2.Queue队列
        @Bean("bootQueue")
        public Queue bootQueue() {
            return QueueBuilder.durable(QUEUE_NAME).withArgument("x-message-ttl", 5000).build();
        }
    
        //3.绑定交换机和队列
        @Bean
        public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(Routing_Key).noargs();
        }
    }

    2.消息设置过期时间

    @RequestMapping("/producer")
        public void producer(){
    
            MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setExpiration("5000"); // 消息过期时间
                    return message;
                }
            };
    
            for(int i = 0;i<10;i++){
                if(i>5){
                    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ttl.test","spring boot rabbit mq"+i,messagePostProcessor);
                }else{
                    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ttl.test","spring boot rabbit mq"+i);
                }
            }
        }

    五、死信队列

    DLX、Dead Letter Exchange (死信交换机),当消息成为Dead Messag后,可以被重新发送到另一台交换机中。

    消息成为死信到三种情况:

    1.队列消息长度达到限制,

    2.消费者拒接消费信息,basicNack/basicReject 并且不把消息重新放入原目标队列,参数requeue=false。

    3.原队列存在消息过期设置,消息到达超时时间未被消费

    设定参数 x-dead-letter-exchange 和 x-dead-letter-routing-key

    @Configuration
    public class RabbitMQConfig {
    
        public static final String EXCHANGE_NAME = "test_exchange_dlx";
        public static final String QUEUE_NAME = "test_queue_dlx";
        public static final String Routing_Key = "test.dlx.#";
    
        public static final String DEAD_EXCHANGE = "exchange_dlx";
        public static final String DEAD_QUEUE = "queue_dlx";
        public static final String DEAD_ROUTINGKEY = "dlx.#";
    
    
        //声明正常的交换机(test_exchange_dlx)和队列(test_queue_dlx)
        //交换机
        @Bean("testExchangeDlx")
        public Exchange testExchangeDlx() {
            return new TopicExchange(EXCHANGE_NAME);
        }
    
        //声明队列
        @Bean("testQueueDlx")
        public Queue testQueueDlx() {
            Map<String, Object> args = new HashMap<>();
            //绑定  死信队列Exchange
            args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
            //绑定 死信队列抛出异常重定向队列的routingKey
            args.put("x-dead-letter-routing-key", "dlx.hehe");
            //过期时间
            args.put("x-message-ttl", 5000);
            //最大长度
            args.put("x-max-length", 10);
            return new Queue(QUEUE_NAME,true,false,false,args);
        }
    
        //绑定交换机和队列
        @Bean
        public Binding bindQueueExchange(@Qualifier("testQueueDlx") Queue queue, @Qualifier("testExchangeDlx") Exchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(Routing_Key).noargs();
        }
    
        //声明死信的交换机和队列
        // 死信交换机和普通交换机没有什么区别
        @Bean("deadExchange")
        public Exchange deadExchange() {
            return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).durable(true).build();
        }
        @Bean("deadQueue")
        public Queue deadQueue() {
            return QueueBuilder.durable(DEAD_QUEUE).build();
        }
    
        //绑定交换机和队列
        @Bean
        public Binding deadBindQueueExchange(@Qualifier("deadQueue") Queue queue, @Qualifier("deadExchange") Exchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(DEAD_ROUTINGKEY).noargs();
        }
    
    }

    1、创建交换机,队列

    2、创建死信交换机,队列

    3.在正常队列中绑定死信交换机 即可 超过指定条数 或者超过时间 就会自动进入 deadQueue队列

    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try{
                System.out.println(new String(message.getBody()));
                //手动确认
                int i = 1/0; //
    //            Thread.currentThread().sleep(2000);
                channel.basicAck(deliveryTag,true);
            }catch (Exception ex){
                //发生异常 拒绝确认
                //第三个参数,重回队列 requeue = false 
                channel.basicNack(deliveryTag,true,false);
            }
        }

    经测试,手动拒绝 requeue=false直接进 死信队列,  超时后 未处理的条数会进入死信队列, 超过10条的直接进入死信队列。

    六、延迟队列

    消息进入队列不会立即消费,只有到达指定时间后才会消费。

    需求 :

      1.下单后,30分钟未支付,取消订单,回滚库存。

      2.新用户注册成功7天后,发送短信问候。

    使用TTL+死信队列 实现延迟队列的效果,

    队列过期时间设置为30分钟,30分钟后失效到 死信队列中, 消费dlx 死信队列 即可

    七、日志与监控

    rabbitMQ 日志文件存放路径: /var/log/rabbitmq/rabbit@主机名.log

    八、消息追踪

    在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于rabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了链接,而它们与RabbitMQ又采用了不同的确认机制。也有可能是因为交换机与队列之间不同的转发策略,甚至是交换机没有雨任何队列进行绑定,这个时候就需要一个较好的机制跟踪记录消息的投递过程,协助开发人员进行问题的定位。

    在RabbitMQ中可以使用Firehose 和rabbitmq_tracing插件功能来实现消息追踪。

     

    https://blog.csdn.net/u013256816/article/details/76039201

     

    开发 测试环境 调错用, 生产环境会影响性能

     

    九、消息可靠性保障-消息补偿

     

     

    1.首先入库,然后去发送到队列,消费后入库。 完整流程

    2.入库,发送消息失败了,过了10分钟发送延迟消息,监听延迟消息,将消息写入数据库,写一个服务定时检查是否成功,如果没成功 重新发送消息。

     

    十、消息幂等性保障-乐观锁机制

    不管请求一次还是多次请求结果应该是一样的。在MQ中,消费多条相同的消息,得到与消费一次相同的结果。

     

    如果 consumer 挂了, producer 又发了一次  就会队列中 存在两次 消费500元的通知,

    可以在指定版本号的方式,防止sql执行,只更新version为1的 sql

     

    十一、集群搭建

    多服务器用ip区分

    单机多实例用端口区分

     

     

     

     

     

  • 相关阅读:
    Codechef EDGEST 树套树 树状数组 线段树 LCA 卡常
    BZOJ4319 cerc2008 Suffix reconstruction 字符串 SA
    Codechef STMINCUT S-T Mincut (CodeChef May Challenge 2018) kruskal
    Codeforces 316G3 Good Substrings 字符串 SAM
    Codechef CHSIGN Change the Signs(May Challenge 2018) 动态规划
    BZOJ1396 识别子串 字符串 SAM 线段树
    CodeForces 516C Drazil and Park 线段树
    CodeForces 516B Drazil and Tiles 其他
    CodeForces 516A Drazil and Factorial 动态规划
    SPOJ LCS2
  • 原文地址:https://www.cnblogs.com/baidawei/p/13306124.html
Copyright © 2011-2022 走看看