zoukankan      html  css  js  c++  java
  • 一行一行往上爬

    主要参数

    corePoolSize:核心线程数

    核心线程会一直存活,及时没有任务需要执行。当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理。

    maxPoolSize:最大线程数

    当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务。
    当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常。

    keepAliveTime:非核心线程闲置时的超时时长

    超过这个时长,非核心线程就会被回收。当ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true 时,keepAliveTime 同样会作用于核心线程。

    unit:用于指定 keepAliveTime 参数的时间单位

    常用的有 TimeUnit .MILLISECONDS 和 TimeUnit .SECONDS。

    workQueue:线程池中的任务队列

    通过线程池的 execute 方法提交的 Runnable 对象会存储在这个参数中。

    threadFactory:线程工厂

    为线程池提供创建新的线程的功能。threadFactory 是一个接口,它只有一个方法: public abstract Thread newThread (Runnable r);

    RejectedExecutionHandler:通常叫做拒绝策略

    在线程池已经关闭的情况下或者当线程数已经达到maxPoolSize且队列已满。只要满足其中一种时,在使用execute() 来提交新的任务时将会拒绝,而默认的拒绝策略是抛一个 RejectedExecutionException 异常。

    allowCoreThreadTimeout: 是否允许主线程超时回收

    看看代码

    //执行线程
    executor.execute(workers[i]);
    //执行代码如下

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();

            int c = ctl.get();
        //1. 当运行的线程数小于corePoolSize的时候 ,创建新的线程即Worker执行提交的任务
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
        /**
        2. 如果线程数大于等于corePoolSize的时候,将任务提交到workQueue队列中 ,如果成功添加 ,即在上面的runWorker就会执行调用了,当然这里会重新的核查此时的线程数,看下是否有线程减少,如果减少,则创建新的线程来使线程数维持在corePoolSize的数目
        */

            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(nullfalse);
            }
            /*
             * 3. 如果队列满了后,则创建新的线程来执行,当然这里有一种极端情况,当线程数等于maximumPoolSize时,并且workQueue也满了后,则会使用
             */

            else if (!addWorker(command, false))
                reject(command);
        }

    那么addWorker 方法是如何唤醒runWorker的呢?

    我们看下addWorker

    private boolean addWorker(Runnable firstTask, boolean core) {

            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        int rs = runStateOf(ctl.get());

                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        //这里将t线程启动起来了,那么t是什么,是w.thread w是woker,thread是worker的一个属性,看下面这段代码
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }

    //这里表明thread里面的target(runnable)就是Worker,所以唤醒的时候是唤醒的Worker的run方法,再看run方法
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }

    这段核心代码

    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)

    当阻塞队列中没有任务时,等待时间达到keepAliveTime毫秒值时就会被自动唤醒,而不会永远地沉睡下去。也就是说设置为60s还是没有取到task,说明没有任务了,就把这个worker给移除了。哇,好巧妙

    线程池的实际应用

    • 1、一般需要根据任务的类型来配置线程池大小:如果是CPU密集型任务,就要减少CPU的线程切换频率,参考值可以设为 N*CPU+1 , 如果是IO密集型任务,就需要尽量压榨CPU,让CPU可以处理更多任务,参考值可以设置为2*NCPU

    • 2、一般情况下,队列的大小遵循下面的公事:

    queSize <= ClientTimeOut(秒) * TPS , 队列大小 小于等于 客户端超时 * 每秒处理的交易数 , 比如客户端超时60s,TPS为1 , 那么队列长度就可以设置为60,因为第60个任务加到队列里面的时候,第一个任务还没处理完。那么第61个来的时候,等TPS为1 ,肯定处理时间会大于60s,所以第61条及以后的直接超时了。所以队列长度过长没有意义。

    • 3、比如你是全程异步的系统你的队列设置可以设置为0,触发拒绝策略,启用补偿机制,慢慢补偿异步任务,如果无界队列,会不断的在queue里面添加任务,corePoolSize设置为cpu核数。

    • 4、当请求速度远大于处理速度,队列就会无限加入也会造成 资源耗尽,服务宕掉

    /**
         * 默认的核心线程数是CPU*2 最大线程数是500
         */

        private final int DEFAULT_CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
        private final int DEFAULT_MAX_POOL_SIZE = 500;

        private Integer corePoolSize;
        private Integer maxPoolSize;

        @PostConstruct
        public void init() {
            corePoolSize = //from db config
            if (corePoolSize == null) {
                corePoolSize = DEFAULT_CORE_POOL_SIZE;
            }
            maxPoolSize = //from db config
            if (maxPoolSize == null) {
                maxPoolSize = DEFAULT_MAX_POOL_SIZE;
            }
        }


    /**
         * 扣款异步线程池
         *
         * @return
         */

        @Bean
        public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
            //这里是用的Spring的线程池,一样
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(corePoolSize);
            executor.setMaxPoolSize(maxPoolSize);
            //空闲线程存活时间60s
            executor.setKeepAliveSeconds(60);
            //队列大于0就是LinkBlockQueue无界队列,小于等于0则是队列采用SynchronizedQueue无缓存队列
            executor.setQueueCapacity(0);
            executor.setThreadNamePrefix(“AS”+Thread.currentThread().getName());
            executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                    log.info("[异步线程池] 发生拒绝异常,触发补偿扣款自动任务 coreSize:{} maxSize:{}", e.getCorePoolSize(), e.getMaximumPoolSize());
                    throw new RejectedExecutionException("Task " + r.toString() +
                            " rejected from " +
                            e.toString());
                }
            });
            return executor;
        }
    public void startAsynTask() {
            try {
                asynTaskService.asynProcess();
            }
            //正常请求异步线程池满,抛出异常,启动补偿机制
            catch (RejectedExecutionException e) {
                log.warn("[异步线程池满] 进入消息队列重新投递");
                //重新投递  可以重新弄个task扫描,减小压力
            }
        }

    @Async("threadPoolTaskExecutor")
    public void asynProcess(){
     // 你的业务代码
    }
  • 相关阅读:
    利用C# + GDI plus模拟杂乱无章的现实场景
    Windows Identity Foundation已包含在.NET 4.5中
    实体框架 6.0:异步、IQueryable操作符和特性改进
    Knotter 0.7.0 发布,交错图案设计工具
    实体框架 5.0:空间数据类型、性能增强、数据库提升
    JFormDesigner 5.2 Beta 发布,Swing设计工具
    获取泛型参数的泛型类型
    Android MapView 申请apiKey
    Android Animation学习笔记
    eclipse 无法启动 JVM terminated. Exit code=1
  • 原文地址:https://www.cnblogs.com/itar/p/11401733.html
Copyright © 2011-2022 走看看