zoukankan      html  css  js  c++  java
  • 线程池

    通过Executor创建线程池

    Executors.newFixedTreadPool 

     public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }

    内部通过new ThreadPoolExecutor创建线程池

    返回一个固定数量的线程池。如果线程池中有空闲线程则直接交给空闲线程执行。如果没有将任务放到队列 默认使用的是LinkedBlockingQueue 无界队列,如果大量任务线程池线程来不及处理,产生无限堆积可能会有OOM风险

    Executors.newSingleThreadExecutor 

     public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }

    返回一个线程的线程池,如有空闲则执行,没有则将任务放到队列中等待 LinkedBlockingQueue 无界队列,如果大量任务线程池线程来不及处理,产生无限堆积可能会有OOM风险,

    通过DelegatedExecutorService包装返回 重写了finalize 如果我们没有手动shtdown在GC回收的时候会调用此方法完成回收 

    同时通过FinalizableDelegatedExecutorService包装 只能暴露ExecutorService相关方法

        static class FinalizableDelegatedExecutorService
                extends Executors.DelegatedExecutorService {
            FinalizableDelegatedExecutorService(ExecutorService executor) {
                super(executor);
            }
            //重写了finalize 如果我们没有手动shtdown在GC回收的时候会调用此方法完成回收 
            protected void finalize() {
                super.shutdown();
            }
        }
    static class DelegatedExecutorService extends AbstractExecutorService {
            private final ExecutorService e;
            DelegatedExecutorService(ExecutorService executor) { e = executor; }
            public void execute(Runnable command) { e.execute(command); }
            public void shutdown() { e.shutdown(); }
            public List<Runnable> shutdownNow() { return e.shutdownNow(); }
            public boolean isShutdown() { return e.isShutdown(); }
            public boolean isTerminated() { return e.isTerminated(); }
            public boolean awaitTermination(long timeout, TimeUnit unit)
                throws InterruptedException {
                return e.awaitTermination(timeout, unit);
            }
            public Future<?> submit(Runnable task) {
                return e.submit(task);
            }
            public <T> Future<T> submit(Callable<T> task) {
                return e.submit(task);
            }
            public <T> Future<T> submit(Runnable task, T result) {
                return e.submit(task, result);
            }
            public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
                throws InterruptedException {
                return e.invokeAll(tasks);
            }
            public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                                 long timeout, TimeUnit unit)
                throws InterruptedException {
                return e.invokeAll(tasks, timeout, unit);
            }
            public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
                throws InterruptedException, ExecutionException {
                return e.invokeAny(tasks);
            }
            public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                                   long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
                return e.invokeAny(tasks, timeout, unit);
            }
        }

    Executor.newCachedTreadPool 

     public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }

    返回一个根据实际情况调整线程个数的线程池塘 不限制最大线程数,如果有空闲线程则直接交给空线程执行 没有则创建,线程空闲超过60秒则指定回收

    Exucutors.newScheduledThreadPool 

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }


      public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }
    public class ScheduledThreadPoolExecutor
            extends ThreadPoolExecutor
            implements ScheduledExecutorService

    可以发现还是通过ThreaPoolExecutor实现 队列使用DeayedWorkQueue

    返回SchededExecutoryService对象

    可以实现定时任务

     public static void main(String[] args) throws InterruptedException {
           ScheduledExecutorService scheduledExecutorService= Executors.newScheduledThreadPool(1);
           scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
               @Override
               public void run() {
                   System.out.println("11");
               }
           },1,3,TimeUnit.SECONDS);
          // 1为延迟多久执行  3为轮训时间  TimeUnit.seconds为 时间单位
        }

    自定义线程池

    ThreadPoolExecutor的构造函数
     public ThreadPoolExecutor(int corePoolSize,//核心线程数量
                                  int maximumPoolSize,//最大线程数量(如果没有超过最大线程数量 没有空闲线程则创建)
                                  long keepAliveTime,//线程的生命周期 超过了corePoolSize的空闲线程回收时间
                                  TimeUnit unit,//keepAliveTime时间单位
                                  BlockingQueue<Runnable> workQueue,//当
                                  ThreadFactory threadFactory,//线程工厂一般默认即可
                                  RejectedExecutionHandler handler)//队列有有界队列。如果任务队列满了以后。拒绝的任务的自定义操作

    corePoolSize

          核心线程数,在创建线程池后,默认情况下线程池中并没有任何线程,而是等待任务到来才去创建线程。当线程池中的线程数目达到corePoolSize后,新来的任务将会被添加到队列汇总,也就是workQueue

    ThreadPoolExecutor#prestartAllCoreThreads() 方法或者是通过ThreadPoolExecutor # prestartCoreThread()方法预创建线程,不用等到任务来了之后才创建,大小一般设置机器核数

    int threadCount = Runtime.getRuntime().availableProcessors();

    maximumPoolSize

                      最大线程数量,前面说了线程池的数量大于等于corePoolSize同时没有空闲线程,还有任务进来。会放到workQueue,当workQueue满了之后会看线池线程数量是否大于maximumPoolSize,如果不大于则创建线程执行任务,如果队列满了线程数量又等于maximumPoolSize则触发拒绝策略配置的RejectedExecutionHandler

    keepAliveTime

                      corePoolSize之后创建线程的线程存活时间指的是非corePoolSize这部分线程即corePoolSize之后创建的线程,如果超过指定时间还有没执行过任务

    workQueue

                     阻塞队列,如果线程数量大于corePoolSize同时没有空闲线程,还有任务到来既会尝试加入此对了,有可能成功有可能失败,失败一般指的是有界队列 队列满了。


    threadFactory

                    线程工厂。用来为线程池创建线程,当我们不指定线程工厂时,线程池内部会调用Executors.defaultThreadFactory()创建默认的线程工厂,其后续创建的线程优先级都是Thread.NORM_PRIORITY。如果我们指定线程工厂,我们可以对产生的线程进行一定的操作。

    handler

              拒绝执行策略。当线程池的缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,

    线程池核心调度源码

      public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            //workerCountOf 获取线程数量如果数量小于corePoolSize 则直addWorker调度执行
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            /**
             *   如果线程数量超过corePoolSize 则调用如果进入等待队列失败 则执行拒绝策略
             *   进入等待队列失败比如队列满了ArrayBlockingQueue 或者SynchronousQueue
             *    则直接提交调度执行
             */
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            //入队列失败 直接调度执行 如果线程数量超过max 则执行拒绝策略
            else if (!addWorker(command, false))
                reject(command);
        }

    workQueue的几种队列

    https://www.cnblogs.com/LQBlog/p/8733764.html

    线程池拒绝策略

    jdk默认拒绝策略

    AbortPolicy 该策略直接抛出异常 影响系统正常运行

        public static class AbortPolicy implements RejectedExecutionHandler {
    
            public AbortPolicy() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                //抛出异常
                throw new RejectedExecutionException("Task " + r.toString() +
                        " rejected from " +
                        e.toString());
            }
        }

    CallerRunsPolicy 当前线程执行

        public static class CallerRunsPolicy implements RejectedExecutionHandler {
           
            public CallerRunsPolicy() { }
            
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                //线程池不是shutDown则直接调用run方法 当前线程执行
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }

    DiscardOldestPolicy 个策略重试添加当前的任务,他会自动重复调用 execute() 方法,直到成功。 显然这样会影响线程提交性能

        public static class DiscardOldestPolicy implements RejectedExecutionHandler {
            public DiscardOldestPolicy() { }
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    //从队列获取并删除元素
                    e.getQueue().poll();
                    //再次调用execute 方回
                    e.execute(r);
                }
            }
        }

    DiscardPolicy :对拒绝任务直接无声抛弃,没有异常信息。

     public static class DiscardPolicy implements RejectedExecutionHandler {
            /**
             * Creates a {@code DiscardPolicy}.
             */
            public DiscardPolicy() { }
    
            /**
             * Does nothing, which has the effect of discarding task r.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }

    如果默认的拒绝策略无法满足 则可以自己通过实现RejectedExecutionHandler接口

    比如一些耗时操作。Runable对象封装业务单号。拒绝策略执行落库 后期重试如

         public MyRejectedExecutionHandler() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if(r instanceof  OrderTask){
                    OrderTask orderTask=(OrderTask)r;
                    String orderTaskName=orderTask.getOrderTask();
                    String parameter=orderTask.getParameter();
                    //执行insert 落库或者发mq后期重试
                }
            }
        }

    线程工厂ThradFactory

    jdk提供的2种默认工厂

    Executors.defaultThreadFactory() 返回用于创建新线程的默认线程工厂。

    Executors.privilegedThreadFactory()    返回用于创建新线程的线程工厂,这些新线程与当前线程具有相同的权限。

    自定义线程名字的Factory

    /**
     * @Project micro-service
     * @PackageName cn.wine.ms.promotion.utils.thread.factory
     * @ClassName NameThreadFacotry
     * @Author qiang.li
     * @Date 2021/8/10 9:57 上午
     * @Description 指定工作线程名字 方便排查问题,不用随机的名字
     * 参考Executors.defaultThreadFactory() 实现 只是自定义namePrefix
     */
    public class NameThreadFactory implements ThreadFactory {
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        public NameThreadFactory(String threadNamePrefix) {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                    threadNamePrefix+
                    "-thread-";
        }
    
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

     扩展线程池

    可以通过这几个方法扩展 监听线程的执行时间

     public static  class SimpleThreadPoolExecutor extends  ThreadPoolExecutor{
    
    
            public SimpleThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
                super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
            }
    
            /**
             * 线程执行之前调用
             * @param t
             * @param r
             */
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                super.beforeExecute(t,r);
            }
    
            /**
             *  线程执行之后调用
             * @param r
             * @param t
             */
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                super.afterExecute(r, t);
            }
    
            /**
             * terminaerd 线程退出时调用
             */
            @Override
            protected void terminated() {
                super.terminated();
            }
        }

    线程池大小计算公式

     感觉不大适用  现在是集群 同时一个应用不止一个线程池

    可以设置为cpu核数

       int threadCount = Runtime.getRuntime().availableProcessors();

    线程池的堆栈异常信息

    无法打印异常信息的写法

      ExecutorService executorService=Executors.newCachedThreadPool();
            for(int i=0;i<10;i++){
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        int j=1/0;
                        System.out.println("ddd");
                    }
                });
            }
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }

    可以打印异常信息的写法

    方法一 改为execute

    ExecutorService executorService=Executors.newCachedThreadPool();
            for(int i=0;i<10;i++){
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        int j=1/0;
                        System.out.println("ddd");
                    }
                });
            }
          
        }

    方法二 使用future.get

       ExecutorService executorService=Executors.newCachedThreadPool();
            for(int i=0;i<10;i++){
                Future future= executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        int j=1/0;
                        System.out.println("ddd");
                    }
                });
                //使用get
                future.get();
            }

     缺点就是能能定位到哪里抛出的异常 并不能定位到哪里提交的task

    重写线程池打印异常

    /**
     * @author liqiang
     * @date 2019/9/19 18:43
     * @Description:
     */
    public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
        public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    
        @Override
        public void execute(Runnable command) {
            super.execute(warp(command,clientTrace(),Thread.currentThread().getName()));
        }
    
        @Override
        public Future<?> submit(Runnable task) {
            return super.submit(warp(task,clientTrace(),Thread.currentThread().getName()));
        }
    
        private Exception clientTrace(){
            //保存着提交线程的堆栈信息
           return  new Exception("client stack trace");
        }
    
        public  Runnable warp(final Runnable task,final Exception clientException,String clientThreadName){
            return new Runnable() {
                @Override
                public void run() {
                    try{
                        task.run();
                    }catch (Exception e){
                        //发生异常后打印提交线程的堆栈信息
                        clientException.printStackTrace();
                        throw e;
                    }
                }
            };
        }
    }
      TraceThreadPoolExecutor executorService=new TraceThreadPoolExecutor(0,1,50,TimeUnit.MINUTES,new SynchronousQueue());
            for(int i=0;i<10;i++){
                Future future= executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        int j=1/0;
                        System.out.println("ddd");
                    }
                });
            }

    重写后就能正常打印到提交地点的堆栈信息

  • 相关阅读:
    Django(模板语言-自定义filter和simple_tag)
    vue项目创建步骤小结
    scrapy 相关
    face parsing(人脸解析)
    FSRNet: End-to-End Learning Face Super-Resolution with Facial Priors论文阅读
    第一周,深度学习的实用层面
    如何阅读英文文献
    学习笔记
    Joint Super-Resolution and Alignment of Tiny Faces
    Super-FAN:Integrated facial landmark localization and super-resolution of real-world low resolution faces in arbitrary poses with GANs论文阅读
  • 原文地址:https://www.cnblogs.com/LQBlog/p/8735356.html
Copyright © 2011-2022 走看看