zoukankan      html  css  js  c++  java
  • Java如何让线程池满后再存放队列

    1.线程池源码分析:

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler){};

    核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、超出核心线程数量的存活时间(keepAliveTime)、

    超出核心线程数量的存活时间单位(unit)、存放任务队列(workQueue)、执行程序创建新线程时使用的工厂(threadFactory)、当线程边界和队列容量达到时拒绝策略(handler)

    正常线程池工作流程

    1:当提交的任务小于核心线程池数量的时候,使用线程池中的核心线程。

    2:当提交的任务大于线程池中核心线程数量的时候,会将新任务存放到队列中。

    3:当队列存满后,会开启新线程直到达到设置的最大线程池数量。

    4:当队列存满后,且线程池中的最大线程数量达到最大的时候,这时候在提交过来任务,直接采用线程池设置的拒绝策略。

    2.场景分析

    由上面可得,如果队列在没有存满的情况下我们的最大线程数量是没有开启的,这时候并没有达到我们想要的多线程的效果。所以我们需要改写一下逻辑

    1:自定义线程池继承ThreadPoolExecutor类,改写核心的逻辑。

    2:自定义队列继承LinkedBlockingQueue,改写 offer 方法。

    自定义队列方法:

    package com.example.util;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.RejectedExecutionException;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @Description: 线程池工作队列重写
     * @Author: mingtian
     * @CreateDate: 2021/4/9 13:22
     * @Version: 1.0
     */
    public class TaskQueue<Runnable> extends LinkedBlockingQueue<Runnable> {
    
        /**
         * 打印日志
         */
        private static Logger logger = LoggerFactory.getLogger(TaskQueue.class);
    
        /**
         * 自定义的线程池类,继承自ThreadPoolExecutor
         */
        private CustomThreadPoolExecutor threadPoolExecutor;
    
        public TaskQueue(int capacity) {
            super(capacity);
        }
    
        /**
         * 对象赋值
         *
         * @param customThreadPoolExecutor
         */
        public void setExecutor(CustomThreadPoolExecutor customThreadPoolExecutor) {
            threadPoolExecutor = customThreadPoolExecutor;
        }
    
        /**
         * offer方法的含义是:将任务提交到队列中,返回值为true/false,分别代表提交成功/提交失败。
         * 作用:TaskQueue的offer返回值来决定是否创建更多的线程,达到先判断maximumPoolSize再判断队列的目的
         *
         * @param runnable
         * @return
         */
        @Override
        public boolean offer(Runnable runnable) {
            if (threadPoolExecutor == null) {
                throw new RejectedExecutionException("The task queue does not have executor!");
            }
            // 线程池的当前线程数
            int currentPoolThreadSize = threadPoolExecutor.getPoolSize();
            if (threadPoolExecutor.getSubmittedTaskCount() < currentPoolThreadSize) {
                // 已提交的任务数量小于当前线程数,意味着线程池中有空闲线程,直接扔进队列里,让线程去处理
                return super.offer(runnable);
            }
    
            // return false to let executor create new worker.
            if (currentPoolThreadSize < threadPoolExecutor.getMaximumPoolSize()) {
                // 重点: 当前线程数小于 最大线程数 ,返回false,暗含入队失败,让线程池去创建新的线程
                return false;
            }
            // 重点: 代码运行到此处,说明当前线程数 >= 最大线程数,需要真正的提交到队列中
            return super.offer(runnable);
        }
    
        /**
         * 重试 在线程池没有关闭的状态时 将任务存放到队列中
         *
         * @param o
         * @param timeout
         * @param unit
         * @return
         * @throws InterruptedException
         */
        public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
            if (threadPoolExecutor.isShutdown()) {
                logger.error("threadPoolExecutor is shutdown!!!");
                throw new RejectedExecutionException("Executor is shutdown!");
            }
            return super.offer(o, timeout, unit);
        }
    }

    自定义线程池类:

    package com.example.util;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @Description: 自定义线程池 重写线程池执行顺序
     * @Author: mingtian
     * @CreateDate: 2021/4/9 13:21
     * @Version: 1.0
     */
    public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
    
        /**
         * 打印日志
         */
        private static Logger logger = LoggerFactory.getLogger(CustomThreadPoolExecutor.class);
    
        /**
         * 定义一个成员变量,用于记录当前线程池中已提交的任务数量
         */
        private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
    
        /**
         * 自定义线程池
         *
         * @param corePoolSize    核心线程池数量
         * @param maximumPoolSize 最大线程池数量
         * @param keepAliveTime   超过核心线程池数量存活时间
         * @param unit            超过核心线程池数量存活时间单位
         * @param workQueue       存放任务的队列
         * @param threadFactory   线程工厂 可以定义线程池名称
         * @param handler         当队列满时执行拒绝策略
         */
        public CustomThreadPoolExecutor(int corePoolSize,
                                        int maximumPoolSize,
                                        long keepAliveTime,
                                        TimeUnit unit, TaskQueue<Runnable> workQueue,
                                        ThreadFactory threadFactory,
                                        RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    
        /***
         * 获取线程池中的任务数量
         * @return
         */
        public int getSubmittedTaskCount() {
            return submittedTaskCount.get();
        }
    
        /**
         * 获取线程池对象
         *
         * @return
         */
        public CustomThreadPoolExecutor getThreadPoolExecutor() {
            return CustomThreadPoolExecutorUtil.getCustomThreadPoolExecutor();
        }
    
        /**
         * 方法执行完毕之后执行
         *
         * @param r
         * @param t
         */
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            // ThreadPoolExecutor的勾子方法,在task执行完后需要将池中已提交的任务数 - 1
            submittedTaskCount.decrementAndGet();
        }
    
        /**
         * 重写execute 方法
         *
         * @param command
         */
        @Override
        public void execute(Runnable command) {
            if (command == null) {
                throw new NullPointerException();
            }
            // do not increment in method beforeExecute!
            // 将池中已提交的任务数 + 1
            submittedTaskCount.incrementAndGet();
            try {
                super.execute(command);
            } catch (RejectedExecutionException rx) {
                // retry to offer the task into queue.
                final TaskQueue queue = (TaskQueue) super.getQueue();
                try {
                    if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                        logger.warn("submittedTaskCount:{},maximumPoolSize:{},queueSize:{},completedTaskCount:{}",
                                getSubmittedTaskCount(), getThreadPoolExecutor().getMaximumPoolSize(),
                                getThreadPoolExecutor().getQueue().size(), getThreadPoolExecutor().getCompletedTaskCount());
                        submittedTaskCount.decrementAndGet();
                        throw new RejectedExecutionException("Queue capacity is full.", rx);
                    }
                } catch (InterruptedException x) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } catch (Throwable t) {
                // decrease any way
                submittedTaskCount.decrementAndGet();
                throw t;
            }
        }
    }

    测试类:

    package com.example.util;
    
    import com.google.common.util.concurrent.ThreadFactoryBuilder;
    import lombok.SneakyThrows;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @Description: 自定义线程池队列
     * @Author: mingtian
     * @CreateDate: 2021/4/9 13:28
     * @Version: 1.0
     */
    public class CustomThreadPoolExecutorUtil {
        /**
         * 打印日志
         */
        private static Logger logger = LoggerFactory.getLogger(CustomThreadPoolExecutorUtil.class);
    
        /**
         * 默认 CPU 核心数
         */
        private static int threadPoolSize = 0;
    
        static {
            // 获取服务器 CPU 核心数
            threadPoolSize = Runtime.getRuntime().availableProcessors();
            logger.info("服务器 CPU 核心数量:{}", threadPoolSize);
        }
    
        public static int getThreadPoolSize() {
            return threadPoolSize;
        }
    
        /**
         * 线程工厂,用来创建线程
         */
        private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("port-pool-%d").build();
    
        private static TaskQueue taskQueue = new TaskQueue<>(10);
    
        /**
         * 自定义线程池
         */
        private static CustomThreadPoolExecutor CustomThreadPoolExecutor = new CustomThreadPoolExecutor(2, 2 * 2,
                60L, TimeUnit.SECONDS, taskQueue, threadFactory, new ThreadPoolExecutor.AbortPolicy());
    
        /**
         * 获取线程池对象方法
         *
         * @return
         */
        public static CustomThreadPoolExecutor getCustomThreadPoolExecutor() {
            return CustomThreadPoolExecutor;
        }
    
        /**
         * 模拟发送消息方法
         */
        public static class SendMessage implements Runnable {
            private int i;
    
            public SendMessage(int i) {
                this.i = i;
            }
    
            @SneakyThrows
            @Override
            public void run() {
                logger.info("我是第{}条消息,poolSize:{},queueSize:{},activeCount:{},completedTaskCount:{}", i,
                        CustomThreadPoolExecutor.getPoolSize(), CustomThreadPoolExecutor.getQueue().size(),
                        CustomThreadPoolExecutor.getActiveCount(), CustomThreadPoolExecutor.getCompletedTaskCount());
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            logger.info("-------------------------------开始测试--------------------------------------");
            taskQueue.setExecutor(CustomThreadPoolExecutor);
            for (int i = 1; i <= 16; i++) {
                CustomThreadPoolExecutorUtil.SendMessage sendMessage = new CustomThreadPoolExecutorUtil.SendMessage(i);
                CustomThreadPoolExecutor.execute(sendMessage);
            }
            Thread.sleep(10000);
            CustomThreadPoolExecutor.shutdown();
            logger.info("-------------------------------测试结束--------------------------------------");
        }
    }

    自定义线程池核心逻辑:

    当提交任务到CustomThreadPoolExecutor的时候,执行 submittedTaskCount.incrementAndGet(); 将线程池中数量+1处理,然后调用父类 super.execute(command); 执行。

    1         // 代码运行到此处,说明线程数 >= corePoolSize, 此时workQueue为自定义的TaskQueue       
    2         if (isRunning(c) && workQueue.offer(command)) {
    3             int recheck = ctl.get();
    4             if (! isRunning(recheck) && remove(command))
    5                 reject(command);
    6             else if (workerCountOf(recheck) == 0)
    7                 addWorker(null, false);
    8         }

    自定义队列核心逻辑:

    当执行到 workQueue.offer(command) 方法的时候走的我们自定义队列TaskQueue的offer方法,而offer方法的返回值决定着是否创建更多的线程:返回true,代表入队成功,不创建线程;返回false,代表入队失败,需要创建线程。

     1     public boolean offer(Runnable runnable) {
     2         if (threadPoolExecutor == null) {
     3             throw new RejectedExecutionException("The task queue does not have executor!");
     4         }
     5         // 线程池的当前线程数
     6         int currentPoolThreadSize = threadPoolExecutor.getPoolSize();
     7         if (threadPoolExecutor.getSubmittedTaskCount() < currentPoolThreadSize) {
     8             // 已提交的任务数量小于当前线程数,意味着线程池中有空闲线程,直接扔进队列里,让线程去处理
     9             return super.offer(runnable);
    10         }
    11 
    12         // return false to let executor create new worker.
    13         if (currentPoolThreadSize < threadPoolExecutor.getMaximumPoolSize()) {
    14             // 重点: 当前线程数小于 最大线程数 ,返回false,暗含入队失败,让线程池去创建新的线程
    15             return false;
    16         }
    17         // 重点: 代码运行到此处,说明当前线程数 >= 最大线程数,需要真正的提交到队列中
    18         return super.offer(runnable);
    19     }

    核心逻辑:当前线程数小于最大线程数就返回false,代表入队失败,需要创建线程。

    因此,总结起来就是:自定义的CustomThreadPoolExecutor依赖自定义的TaskQueue的offer返回值来决定是否创建更多的线程,达到先判断maximumPoolSize再判断队列的目的。

    3.参考文献

    tomcat 源码中的线程池也是使用的这样的思想,该例子来源于tomcat源码思想。

    tomcat 线程池 源码:

     private final AtomicInteger submittedCount = new AtomicInteger(0);
    
        public void execute(Runnable command, long timeout, TimeUnit unit) {
            submittedCount.incrementAndGet();
            try {
                super.execute(command);
            } catch (RejectedExecutionException rx) {
                if (super.getQueue() instanceof TaskQueue) {
                    final TaskQueue queue = (TaskQueue)super.getQueue();
                    try {
                        if (!queue.force(command, timeout, unit)) {
                            submittedCount.decrementAndGet();
                            throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
                        }
                    } catch (InterruptedException x) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException(x);
                    }
                } else {
                    submittedCount.decrementAndGet();
                    throw rx;
                }
    
            }
        }

    tomcat 源码中 TaskQueue 源码:

        public boolean offer(Runnable o) {
          //we can't do any checks
            if (parent==null) return super.offer(o);
            //we are maxed out on threads, simply queue the object
            if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
            //we have idle threads, just add it to the queue
            if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
            //if we have less threads than maximum force creation of a new thread
            if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
            //if we reached here, we need to add it to the queue
            return super.offer(o);
        }
  • 相关阅读:
    深度学习入门零基础笔记(一)一些相关链接
    华为云计算笔记(摘要略读,零基础)(九)(虚拟化特性介绍华为虚拟化产品特性)
    华为云计算笔记(摘要略读,零基础)(八)(虚拟化特性介绍-虚拟化特性)
    华为云计算笔记(摘要略读,零基础)(七)(虚拟化特性介绍集群特性介绍)
    华为云计算笔记(摘要略读,零基础)(六)(云计算存储基础介绍)
    华为云计算笔记(摘要略读,零基础)(五)(云计算网络基础介绍)
    华为云计算笔记(摘要略读,零基础)(四)(实验 FusionCompute安装)
    华为云计算笔记(摘要略读,零基础)(三)(KVM介绍、FusionCompute架构)
    华为云计算笔记(摘要略读,零基础)(一)(云计算介绍)
    基于kolla部署openstack
  • 原文地址:https://www.cnblogs.com/ming-blogs/p/14636960.html
Copyright © 2011-2022 走看看