zoukankan      html  css  js  c++  java
  • [MQ]说一说MQ消息积压

    前言

    就是在前不久,白天好好地坐在工位上码代码,运维那边的同事打电话和我说我的服务发的消息太多,造成RMQ集群消息积压了很多,于是连忙去看日志和RMQ的管理页面,好家伙发了20来W条数据。这是令我没想到的,因为以我们产品的用户数量和这个接口的使用频率上来说,这是不大可能的。后来调查到是一个同事在批量处理他们系统的数据,需要走我这个接口,总共大约要处理15W条,但是他连续操作了两次,这样连续的消息发送并且由于消息消费者处理的速率以及消费者的数量并不多,自然消息就积压了。其实我在发送的时候是做了简单的流量控制的,但是还是不够周全。所以晚上到家我就在想着处理方案。

    从消费者还是生产者下手

    这个事情,并且就正常情况而言,也就是没有人像我同事这样操作或者用户恶意调用的情况,是不会出现上述的问题的。事实上用户恶意调用是不成立的,我这里是基础服务,上层服务都做了限流或者安全措施,但无法避免公司内部服务来多次调用,比如我同事同步数据的时候调用很多次。所以这里的问题不在于消费者的消费速度或者消费者的数量,而是应该在我这边做限流,控制发送的速度。所以需要从生产者下手。

    是否需要重发

    如果简单地只做限流,只要用RateLimiter这样的限流器,便可用令牌桶法来进行限流,没有获取到令牌的就丢掉。(令牌桶算法请看->限流算法),但是否需要丢掉消息是需要根据业务来的,经过对业务的考量,不能丢掉,因为直接影响到客户。那么就需要重发机制了。

    重发机制

    我的想法是在本地新增一个线程安全的队列,如果某条消息发送的时候显示太快了,那么把这个消息发送队列里。然后后台有轮询的线程不断从队列中取消息,取出之后再重新发送。当然,每个消息应该有一个延时的时间,所以这个队列应当支持延时。关于延时队列请看->延时队列,这里我采用Java里的DelayQueue来实现。为了线程安全,队列分别有一把写锁和读锁。后台线程如何创建?采用SpringBoot自带的@Async来异步启动。线程内部使用线程池来进行多线程发送。

    代码实现

    延时队列中存放的消息类

    public class DelayTask implements Delayed {
    
    
        /**
         * 存放消息的map
         * */
        private HashMap<String,Object> msg;
    
    
        /**
         * 延时时长+入队时间的值
         * */
        private long dealAt;
    
        public HashMap<String, Object> getMsg() {
            return msg;
        }
    
        public void setMsg(HashMap<String, Object> msg) {
            this.msg = msg;
        }
    
        public DelayTask(long time, HashMap<String,Object> msg){
            this.dealAt = time;
            this.msg = msg;
        }
    
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(
                    dealAt-System.currentTimeMillis(),
                    TimeUnit.MILLISECONDS);
        }
    
        @Override
        public int compareTo(Delayed o) {
            if(getDelay(TimeUnit.MILLISECONDS)>o.getDelay(TimeUnit.MILLISECONDS)) {
                return 1;
            }else {
                return -1;
            }
        }
    
    	//getter setter
    }
    

    延时队列管理

    public class DelayQueueManager {
    
        /**
         * 延时队列
         * */
        private DelayQueue<DelayTask> queue = null;
    
        private ReentrantLock readLock;
    
        private ReentrantLock writeLock;
    
        private static DelayQueueManager instance = null;
    
        public static DelayQueueManager getInstance(){
            if(instance == null){
                synchronized (DelayQueueManager.class){
                    if(instance == null){
                        instance = new DelayQueueManager();
                    }
                }
            }
            return instance;
        }
    
        private DelayQueueManager(){
            queue = new DelayQueue<>();
            readLock = new ReentrantLock();
            writeLock = new ReentrantLock();
        }
        /**
         * 消息入队列
         * */
        public void saveMsg(long outTime,HashMap<String,Object> msg){
            writeLock.lock();
            try {
                queue.add(new DelayTask(outTime,msg));
            }finally {
                writeLock.unlock();
            }
        }
        /**
         * 消息出队列
         * */
        public HashMap<String,Object> getMsg() throws InterruptedException {
            DelayTask t = null;
            readLock.lock();
            try {
                //阻塞取消息
                t = queue.take();
            }finally {
                readLock.unlock();
            }
            return t.getMsg();
        }
    }
    
    

    重发器

    @Configuration
    @Slf4j
    @EnableAsync
    public class Resender implements DisposableBean{
    
        @Value("${rmq.exchange.name}")
        public String exchangeName;
    
        @Value("${rmq.queue.name}")
        public String queueName;
    
        @Value("${rmq.binding.name}")
        public String bindingName;
        /**
         * 后台线程池
         * */
        private ExecutorService threadPool;
    
        private boolean isStop;
    
    
        public Resender(){
            log.info("构造重发器");
            threadPool = new ThreadPoolExecutor(
                    8,
                    10,
                    100L,
                    TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(),
                    new ThreadPoolExecutor.DiscardPolicy());
            isStop = false;
        }
    
        @Autowired
        private MQSender sender;
    
        @Async
        public void start() {
            log.info("重发器启动");
            while (!isStop){
    
                try {
                    HashMap<String,Object> msg = DelayQueueManager.getInstance().getMsg();
                    if(msg!=null){
                        log.info("重新发送消息:id="+msg.get("msgId"));
                        String messageId = String.valueOf(msg.get("msgId"));
                        String createTime = String.valueOf(msg.get("createTime"));
                        String msgData = String.valueOf(msg.get("msgData"));
                        threadPool.submit(new Runnable() {
                            @Override
                            public void run() {
                                sender.sendMessage(messageId,msgData,createTime);
                            }
                        });
                    }
                }catch (Exception e){
                    log.info("取消息异常"+e.getMessage());
                }
    
    
            }
        }
    
        @Override
        public void destroy() throws Exception {
            isStop = true;
            log.info("重发器停止");
        }
    
    }
    
    

    真正发送MQ消息的发送器

    @Component
    @Slf4j
    public class MQSender {
    
    
        @Value("${rmq.exchange.name}")
        public String exchangeName;
    
        @Value("${rmq.queue.name}")
        public String queueName;
    
        @Value("${rmq.binding.name}")
        public String bindingName;
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        private RateLimiter rateLimiter = RateLimiter.create(100);
        /**
         * 发送消息
         * */
        public void sendMessage(String messageId,String msgContent,String createTime){
            HashMap<String,Object> map = new HashMap<String,Object>();
            map.put("msgId",messageId);
            map.put("msgData",msgContent);
            map.put("createTime",createTime);
    
            if(rateLimiter.tryAcquire(1,10,TimeUnit.MILLISECONDS))
            {
                rabbitTemplate.convertAndSend(exchangeName,bindingName,map);
                log.info("发送消息:"+map);
            }
            else
            {
                log.info("发送速率过快,id="+messageId);
                long currentTime = System.currentTimeMillis();
                DelayQueueManager.getInstance().saveMsg(currentTime+10L,map);
            }
        }
    }
    

    测试方法

    简单写一个方法

    @RestController
    public class FlowController {
    
        @Autowired
        private MQSender sender;
    
        @GetMapping("/sendMsg")
        public String sendMessage(){
            String messageId = String.valueOf(UUID.randomUUID());
            String messageData = "Hello world";
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            sender.sendMessage(messageId,messageData,createTime);
            return "Send OK";
        }
    }
    
    

    压力测试

    使用JMeter来进行测试,因为我令牌桶没有设置很大,所以一个线程循环请求1000次接口基本就能看到效果。

    avatar

    观察日志:

    ........
    2021-03-30 00:55:48.547  INFO 13132 --- [pool-1-thread-6] cn.izzer.rmq_producer.sender.MQSender    : 发送消息:{createTime=2021-03-30 00:55:45, msgId=4c0cb97a-147c-495e-a889-54e4ed0c3070, msgData=Hello world}
    2021-03-30 00:55:48.547  INFO 13132 --- [pool-1-thread-7] cn.izzer.rmq_producer.sender.MQSender    : 发送速率过快,id=dd94cc70-2dc2-4614-8d08-169b6064f23d
    2021-03-30 00:55:48.557  INFO 13132 --- [pool-1-thread-5] cn.izzer.rmq_producer.sender.MQSender    : 发送消息:{createTime=2021-03-30 00:55:45, msgId=c4eae4bf-08f5-449c-8f0d-be9de1ab7ec8, msgData=Hello world}
    2021-03-30 00:55:48.558  INFO 13132 --- [         task-1] cn.izzer.rmq_producer.common.Resender    : 重新发送消息:id=dd94cc70-2dc2-4614-8d08-169b6064f23d
    2021-03-30 00:55:48.558  INFO 13132 --- [         task-1] cn.izzer.rmq_producer.common.Resender    : 重新发送消息:id=3575c881-62c0-4e72-9d44-4398753eb39e
    2021-03-30 00:55:48.558  INFO 13132 --- [         task-1] cn.izzer.rmq_producer.common.Resender    : 重新发送消息:id=f0c6eb44-3320-4447-b2fb-356872d11b51
    ........
    

    但是这样还是看不出效果,这里我用编辑器抽取了一条记录从最开始被限流到最终经过几次重发的过程,可以看到这条消息是历经几次才发送成功的,当然这里我设计的可能还不大好,因为这个过程理论上应该要更加平滑才是合理的,可以适当增加延时时间,或者动态设置延时时间可能效果更好,不过这个方案我可能得再思考一下:

    avatar

    感谢观看,上面我可能有考虑不好的地方,希望有经验的大佬可以顺便指点指点,谢谢。

  • 相关阅读:
    内存溢出与内存泄露的区别
    <a>标签
    mac上的设置查看环境变量
    css-position
    css-overflow
    css-clear
    mongodb基本操作
    idea使用maven install命令打包(springboot),jar运行时出现没有主清单属性
    linux运行jar报错
    maven deploy时报错
  • 原文地址:https://www.cnblogs.com/Yintianhao/p/14595136.html
Copyright © 2011-2022 走看看