zoukankan      html  css  js  c++  java
  • ActiveMQ 笔记(八)高级特性和大厂常考重点

    个人博客网:https://wushaopei.github.io/    (你想要这里多有)

    1、可用性保证

    引入消息队列之后该如何保证其高可用性?

    持久化、事务、签收、 以及带复制的 Leavel DB + zookeeper 主从集群搭建  

    2、异步投递Async Sends

    2.1  异步投递的定义

           对于一个Slow Consumer,使用同步发送消息可能出现Producer堵塞的情况,慢消费者适合使用异步发送

    如图介绍:

    2.2 什么是异步投递

            ActiveMQ支持同步,异步两种发送的模式将消息发送到broker,模式的选择对发送延时有巨大的影响。producer能达到怎么样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著提高发送的性能。
            ActiveMQ默认使用异步发送的模式:除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送的。
            如果你没有使用事务且发送的是持久化的消息,每一次发送都是同步发送的且会阻塞producer知道broker返回一个确认,表示消息已经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来了很大的延时。
    很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。
     
    异步发送

    • 它可以最大化producer端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过这也带来了额外的问题,
    • 就是需要消耗更多的Client端内存同时也会导致broker端性能消耗增加;
    • 此外它不能有效的确保消息的发送成功。在userAsyncSend=true的情况下客户端需要容忍消息丢失的可能。

    2.3 官网配置参考(投递开启)

    在高性能要求下,可以使用异步提高producer 的性能。但会消耗较多的client 端内存,也不能完全保证消息发送成功。在 useAsyncSend = true 情况下容忍消息丢失。

        //  开启异步投递
    
        activeMQConnectionFactory.setUseAsyncSend(true);    

    2.4 面试题: 异步消息如何确定发送成功?

    异步发送丢失消息的场景是:生产者设置userAsyncSend=true,使用producer.send(msg)持续发送消息。

    • 如果消息不阻塞,生产者会认为所有send的消息均被成功发送至MQ。
    • 如果MQ突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失。 
    • 所以,正确的异步发送方法是需要接收回调的。

    同步发送和异步发送的区别就在此,

    • 同步大宋等send不阻塞了就表示一定发送成功了,
    • 异步发送需要客户端回执并由客户端再判断一次是否发送成功

    具体实现的代码案例:

    JmsProduce_AsyncSend   在代码中接收回调的函数 :

    package com.demo;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.ActiveMQMessageProducer;
    import org.apache.activemq.AsyncCallback;
    
    import javax.jms.*;
    import java.util.UUID;
    
    public class Producer {
        private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
        private static final String ACTIVEMQ_QUEUE_NAME = "Queue-异步投递回调";
    
        public static void main(String[] args) throws JMSException {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
            activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL);
            //开启异步投递
            activeMQConnectionFactory.setUseAsyncSend(true);
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
    
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
            //向上转型到ActiveMQMessageProducer
            ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);
    
            for (int i = 0; i < 3; i++) {
                TextMessage textMessage = session.createTextMessage("message-" + i);
                textMessage.setJMSMessageID(UUID.randomUUID().toString() + "----orderAtguigu");
                String textMessageId = textMessage.getJMSMessageID();
                //使用ActiveMQMessageProducer的发送消息,可以创建回调
                activeMQMessageProducer.send(textMessage, new AsyncCallback() {
                    @Override
                    public void onSuccess() {
                        System.out.println(textMessageId + "发送成功");
                    }
    
                    @Override
                    public void onException(JMSException exception) {
                        System.out.println(textMessageId + "发送失败");
                    }
                });
            }
            activeMQMessageProducer.close();
            session.close();
            connection.close();
        }
    }
    

    3、延时投递和定时投递

    3.1 在配置文件中设置定时器开关 为 true

    3.2 代码编写

    • Java 代码中封装的辅助消息类型 ScheduleMessage
    • 可以设置的 常用参数 如下: 

    long delay = 3 * 1000 ;
    long perid = 4 * 1000 ;
    int repeat = 7 ;
    
    for (int i = 1; i < 4 ; i++) {
        TextMessage textMessage = session.createTextMessage("delay msg--" + i);
    
        // 消息每过 3 秒投递,每 4 秒重复投递一次 ,一共重复投递 7 次
        textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);
        textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,perid);
        textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);
    
        messageProducer.send(textMessage);
    
    }

    4、 Activemq的消息重试机制

    (1)由消息丢失或延时而引发的问题——常见面试题

    具体哪些情况会引发消息重发

    1. Client用了transactions且再session中调用了rollback
    2. Client用了transactions且再调用commit之前关闭或者没有commit
    3. Client再CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover

     
    请说说消息重发时间间隔和重发次数

    • 间隔:1
      次数:6
       每秒发6次

    有毒消息Poison ACK

    • 一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(私信队列)。

    重试机制的注意事项:

    1. 最多六次还没发出就会
    2. 加入DLQ (死信队列)

    官网链接地址: http://activemq.apache.org/redelivery-policy

    (2)简单案例说明:

    (3)重发整合Spring 的配置

    (4)重试机制的属性说明:

    5、死信队列

    (1)概念: Activemq中引入了“死信队列”(Dead Letter Queue) 的概念。即一条消息再被重发了多次后(默认为重发6次redeliveryCounter==6),将会被Activemq移入“死信队列”,开发人员可以在这个Queue中查看处理出错的消息,进行人工干预。

    (2)死信队列的使用:处理失败的消息

    (3)死信队列的配置介绍

    • SharedDeadLetterStrategy

    • IndividualDeadLetterStrategy

    • 配置案例

      a. 自动删除过期消息

    b. 存放非持久消息到死队列中

    在业务逻辑中,如果一个订单系统没有问题,则使用正常的业务队列,当出现问题,则加入死信队列 ,此时可以选择人工干预还是机器处理 。

    6、 如何保证消息不被重复消费,幂等性的问题

    如果消息是做数据库的插入操作,给这个消息一个唯一的主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据 。

    如果不是,可以用redis 等的第三方服务,给消息一个全局 id ,只要消费过的消息,将 id ,message 以 K-V 形式写入 redis ,那消费者开始消费前,先去 redis 中查询有没消费的记录即可。


  • 相关阅读:
    Kubernetes 集成研发笔记
    Rust 1.44.0 发布
    Rust 1.43.0 发布
    PAT 甲级 1108 Finding Average (20分)
    PAT 甲级 1107 Social Clusters (30分)(并查集)
    PAT 甲级 1106 Lowest Price in Supply Chain (25分) (bfs)
    PAT 甲级 1105 Spiral Matrix (25分)(螺旋矩阵,简单模拟)
    PAT 甲级 1104 Sum of Number Segments (20分)(有坑,int *int 可能会溢出)
    java 多线程 26 : 线程池
    OpenCV_Python —— (4)形态学操作
  • 原文地址:https://www.cnblogs.com/wushaopei/p/12288875.html
Copyright © 2011-2022 走看看