zoukankan      html  css  js  c++  java
  • 消息队列报 堆溢出解决方案

    参考文档:

    https://www.jianshu.com/p/d05c488b84cc

     /**
         * 用于周期性监控线程池的运行状态
         */
        private final ScheduledExecutorService scheduledExecutorService =
            Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("async thread executor monitor").build());
        /**
         * 自定义异步线程池
         * (1)任务队列使用有界队列
         * (2)自定义拒绝策略
         */
        private final ThreadPoolExecutor threadPoolExecutor =
            new ThreadPoolExecutor(consumerThreadNum, consumerThreadNum, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(consumerThreadNum),
                                   new BasicThreadFactory.Builder().namingPattern("Thread-mns-ali-consumer-%d").build(),
                                   (r, executor) -> log.error("the async executor pool is full!!"));

        @PostConstruct
        public void processMessage() {
            queue = client.getQueueRef();
            /**
             * 开启多个线程消费消息
             */
            for (int i = 0; i < consumerThreadNum; i++) {
                threadPoolExecutor.submit(new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                             process();
                        } catch (Exception e) {
                          
                        }
                       
                    }
                }, "Thread-consumer-" + i));
            }
            
            
            scheduledExecutorService.scheduleAtFixedRate(() -> {
                /**
                 * 线程池需要执行的任务数
                 */
                long taskCount = threadPoolExecutor.getTaskCount();
                /**
                 * 线程池在运行过程中已完成的任务数
                 */
                long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
                /**
                 * 曾经创建过的最大线程数
                 */
                long largestPoolSize = threadPoolExecutor.getLargestPoolSize();
                /**
                 * 线程池里的线程数量
                 */
                long poolSize = threadPoolExecutor.getPoolSize();
                /**
                 * 线程池里活跃的线程数量
                 */
                long activeCount = threadPoolExecutor.getActiveCount();
                
                for (; activeCount < consumerThreadNum; activeCount++) {
                    threadPoolExecutor.submit(new Thread(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                 process();
                            } catch (Exception e) {
                               
                            }
                           
                        }
                    }, "Thread-consumer-" + activeCount));
                }
            }, 0, 10, TimeUnit.MINUTES);
            
            
        }

        private void process() {

            while (!Thread.currentThread().isInterrupted()) {
                try {

                    
                    
                    /**
                     * dequeue from queue
                     */
                    Message message = queue.popMessage();

                    /**
                     * no enough message
                     */
                    if (message == null) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        } //没有消息就休眠1秒
                        continue;
                    }

                    try {
                        String body = message.getMessageBodyAsString("UTF-8");
                        MnsMessage mnsMessage = GsonUtils.jsonToBean(body, MnsMessage.class);
                        
                        Thread.sleep(2000); //没有消息就休眠1秒
                        
                        handleHouse(message);
                        //处理成功删除消息
                        queue.deleteMessage(message.getReceiptHandle());
                    } catch (Exception e) {
                        if(e instanceof BaseException){ //监听是不是超时异常
                          }
                        //处理异常,删除消息
                        queue.deleteMessage(message.getReceiptHandle());
                    }finally{
                        // 最后执行MDC删除
                        MDC. remove(Common.SESSION_TOKEN_KEY);
                    }
                } catch (Exception e) {
                  
                }
            }
        }

  • 相关阅读:
    CentOS在线安装RabbitMQ3.7
    php redis的GEO地理信息类型
    (PHP)redis Zset(有序集合 sorted set)操作
    (PHP)redis Set(集合)操作
    (PHP)redis Hash(哈希)操作
    (PHP)redis List(列表)操作
    php cURL error 60
    go build 不同系统下的可执行文件
    windows下改变go的gopath
    线程池简要学习[转]
  • 原文地址:https://www.cnblogs.com/QAZLIU/p/9982986.html
Copyright © 2011-2022 走看看