zoukankan      html  css  js  c++  java
  • 消息中间件-RabbitMq(可靠性方案&死信队列&监控)

    消息中间件-RabbitMq(可靠性方案&死信队列&监控)

    上一章节聊到,他有三个重要的部分,【生产者】、【blocker(rabbit节点】、【消费者】 ,换言之,我们保障了发送可靠性、存储可靠性、消费可靠性,也就保证了消息可靠性。下面会出一个消息可靠性的方案,有时候我们需要对一个超时订单做处理,我们可以使用rabbit的死信队列做这个,下面我们也会聊聊死信队列,已经对rabbit进行监控。

    可靠性思考&方案

    一般消息可靠性分为三个级别:

    【最多一次(rabbit支持):消息可能会丢失,但是不会重复传输。那需要->

    •   消费者就需要开启事务机制或者confirm机制(rabbit提供的,当消息发送到消费者的时候,生产端有会收到异步回调,从而知道正确投递),以此保障可以传递到mq中
    •        消费者需要备份交换机,确定消息可以从交换机路由到队列中,这样就可以让消息不丢失
    •   或者通过mandatory属性(rabbit提供,当消息无法找到相关的队列,那就mq就返回没有投递成功的消息给生产者)

    【最少一次rabbit支持;消息绝对不会丢失,但是可能重复传输 那需要->

    •   无需考虑最多一次所需要考虑的东西,消费者发一次,但是这样很难保证成功。

    【恰好一次rabbit不支持:每条消息会传输一次且只传输一次

    下面结合给出一个解决方案(这个性能不太好,因为要走下面的流程,而且还要对数据库进行操作):

    • 【发送消息的时候】:把发送的消息保存在数据库中
    • 【当消息发送到rabbit上】:在rabbit上有一个监听器
    • 【当把消息发送到生产者这里的时候】:消费者给一个回复(ack)给mq(mq通知生产者,这样就知道消息投递成功)
    • 【更新消息表中的消息状态 】
      •   这个时候我们要有一个定时任务去检查数据库中的消息状态(查看是否是发送成功状态(发送成功会在消费端对数据库的数据状态进行修改))
        • 消息状态为【未知:把消息拿过来进行重发,并且记录消息重发的次数(因为不能一直重发,这样服务器压力大),同时修改状态为定时任务干预,你可以设置一个阈值,到达一定次数后,改变状态为人工干预。
        •  消息状态为【人工干预:这个时候就要人工进行处理异常状态的信息

    springBoot中confirm的实现->【生产端

    这里是监听功能

    @Component
    public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    
        /*** @param correlationData
         *  相关配置信息
         *  @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
         *  @param cause 失败原因 */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack){
                System.out.println("成功发送");
            }else {
                System.out.println("失败"+cause);
                //这里进行重发,或者自己的业务处理
            }
        }
    }

    交换机和队列进行绑定

    /*** 队列与交换机绑定 */
    @Configuration
    public class QueueConfig {
        @Bean(name = "confirmTestQueue")
        public Queue confirmTestQueue() {
            return new Queue("confirm_test_queue", true, false, false);
        }
    
    
        @Bean(name = "confirmTestExchange")
        public FanoutExchange confirmTestExchange() {
            return new FanoutExchange("confirmTestExchange");
        }
    
        @Bean
        public Binding confirmTestFanoutExchangeAndQueue(@Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange, @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
            return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
        }
    }
    View Code

    生产者发送消息

    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = SpringBootForkJoinApplication.class)
    public class Producer {
        @Autowired
        private RabbitTemplate rabbitTemplate; //注入rabbitmq对象
        @Autowired
        private ConfirmCallbackService confirmCallbackService; //注入 ConfirmCallback对象
    
        @Test
        public void test() {
            rabbitTemplate.setConfirmCallback(confirmCallbackService);
            //发送消息
            rabbitTemplate.convertAndSend("dlq_exchange1", "", "hello,ConfirmCallback你好");
        }
    }
    View Code

    自动回调生产者

     springBoot配置文件

    spring:
      rabbitmq:
        publisher-confirm-type: correlated # 开启confirm确认模式
    
    

    消费端(进行手动签收)

    @Component
    @RabbitListener(queues = "confirm_test_queue")
    public class ReceiverMessage {
        @RabbitHandler
        public void processHandler(String msg, Channel channel, Message message) throws IOException {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {System.out.println("消息内容===>" + new String(message.getBody()));
                //TODO 具体业务逻辑 //手动签收[参数1:消息投递序号,参数2:批量签收]
                channel.basicAck(deliveryTag, true);
            } catch (Exception e) {
                //拒绝签收[参数1:消息投递序号,参数2:批量拒绝,参数3:是否重新加入队列]
                channel.basicNack(deliveryTag, true, true); }
        }
    
    }

    死信交换机(DLX【dead-letter-exchange】)

    就是,当消息成为死消息后,可以被重新发送到另一个队列,这个队列就是死信队列(他和别的队列没有区别),绑定这个队列的交换机就是DLX,当存在有一个死信消息的时候,rebbit会将这个消息发送到死信交换机上,从而路由到死信队列上

    消息变成死消息一般有这几种情况:

    • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
    • 消息TTL过期
    • 队列达到最大长度

    TTL(Time-To-Live):表示的是你发送消息的有效期,一般有两种设置方式

    • 声明队列的时候,在队列的属性中设置
    • 发送消息时给消息设置属性
      •  
    • 死信交换机绑定(对于死信队列,他是一个正常队列,我们按照之前的章节中设置的方法设置就行)

    模拟订单支付流程【使用死信队列】

     一个正常的支付消息发送到交换机->交换机路由到,支付消息应有的队列中->这个时候用户超时还没有进行消费(TTL)->支付消息变成了死信消息->死信消息进入了死信交换机->死信交换机把死信消息路由到死信队列中->对死信队列进行监听->发信息通知用户,或者修改订单状态 

    进行监控

    在真正的环境中,我们需要实时了解到我们mq的状态和情况,从而保障他可以完美的提供服务,以下给出3中常见方案:

    使用management】:这个一般在小项目中进行使用,【简单,但是麻烦,因为如果是集群可能要开多个窗口】

    使用他给的api:】 他自己能把数据展示出来到management上,肯定调用了http接口,那我们也可以这样干【需要开发,并且要看相关文档】,常用的api如下:

    • 概括信息:http://localhost:15672/api/overview
    • channel 列表:http://localhost:15672/api/channels节点信息:http://localhost:15672/api/nodes
    • 交换机信息:http://localhost:15672/api/exchanges
    • 队列信息:http://localhost:15672/api/queues
    • vhost 列表:http://localhost:15672/api/vhosts

    prometheus + grafana 监控rabbitmq

    zabbix不同,zabbix是主动接受消息,而普罗米修斯(prometheus )是定时对咱们的消息进行拉取的,他制定了一些规范【exporter】,各个想让他监控的中间件都要实现自己的exporter,并且他要求传递给他的都是json的数据,

    我们把rabbit提供的普罗米修斯的插件启动:rabbitmq-plugins enable rabbitmq_prometheus

    同时开启端口:firewall-cmd --zone=public --add-port=15692/tcp --permanent

    访问普罗米修斯可以认识的数据:http://你的ip:15692/metrics

    剩下的就交给普罗米修斯了

    grafana 中搜索相关的模板进行导入(https://grafana.com/grafana/dashboards?search=rabbitmq)

     最终效果(中间的过程太繁琐了,咱们只是聊聊这个东西,具体可以参考别的博文。)

     这里是官方对Prometheus集成获取指标的文档 https://www.rabbitmq.com/prometheus.html

    常见问题

    【延迟队列】

    实际上还是可以用上面的死信队列的流程进行实现,设置队列过期时间给你想要的延迟时间,当延迟时间过后,进入死信队列,这个时候执行真正的逻辑

    【消息积压】

    前面说到当内存到一定程度(0.4)的时候,rabbit不会接受消息,我们说过可以通过配置文件对这个数据临时增大,这个时候rabbit运行正常,我们就可以增加消费者对数据进行消费,这样就可以解决积压问题。

    【不重复消费】:

    每个消费者给一个唯一id,使用id进行判断

     

  • 相关阅读:
    冒泡排序与选择排序
    SVN-cheanup反复操作失败的问题。
    js区分汉字和字符,校验长度
    maven的安装与使用
    java获取登陆用户的IP地址
    kafka创建topics 错误: 找不到或无法加载主类 FilesJavajdk1.7.0_80lib;C:Program
    SOAPwebservice 与Restfull webservice之间的区别
    CAD数据导入Arcgis10.1的依赖关系
    wpf之StackPanel、WrapPanel、WrapPanel之间的关系
    浅谈修饰符
  • 原文地址:https://www.cnblogs.com/UpGx/p/15006057.html
Copyright © 2011-2022 走看看