zoukankan      html  css  js  c++  java
  • 线程池基础

    一、java.util.concurrent包下的线程池相关的类和接口

    JUC包下的三个接口:
    1、Executor:运行新任务的简单接口,将任务提交和任务执行细节解耦
    2、ExecutorService:具备管理执行器和任务生命周期方法,提交任务机制更完善
    3、ScheduledExecutorService:支持Future和定期执行任务

    ThreadPoolExecutor、ForkJoinPool、ScheduledThreadPoolExecutor使用创建线程池实例的,通过executors工具类来创建:

    1、newFixedThreadPool(int nThreads)
    指定工作线程数量的线程池

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }

    2、newCachedThreadPool()
    处理短时间工作任务的线程池,
    (1)试图缓存线程并重用,当无缓存线程可用,就会创建新的工作线程;
    (2)如果线程闲置时间超过阀值,则会被终止并移除缓存。
    (3)系统长时间闲置时,不会消耗什么资源

    执行execute方法时,首先会先执行SynchronousQueue的offer方法提交任务,并查询线程池中是否有空闲线程来执行SynchronousQueue的poll方法来移除任务。如果有,则配对成功,将任务交给这个空闲线程。否则,配对失败,创建新的线程去处理任务;当线程池中的线程空闲时,会执行SynchronousQueue的poll方法等待执行SynchronousQueue中新提交的任务。若超过60s依然没有任务提交到SynchronousQueue,这个空闲线程就会终止;因为maximumPoolSize是无界的,所以提交任务的速度 > 线程池中线程处理任务的速度就要不断创建新线程;每次提交任务,都会立即有线程去处理,因此CachedThreadPool适用于处理大量、耗时少的任务。

        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }

    3、newSingleThreadExcutor()
    创建唯一的工作线程来执行任务,如果线程异常结束,会有另一个线程来取代他。

        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }

    4、newSingleThreadScheduledExcutor()与newScheduledThreadPool(int corePoolSize)
    定时或者周期性的工作调度,两者的区别在于单一的工作线程还是多个线程

        public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1));
        }

    5、newWordStealingPool()
    内部会构建ForkJoinPool,利用working-stealing算法,并行的处理任务,不保证处理顺序。
    fork/join把大任务切割成小任务执行,最终汇总每个小任务结果后得到大任务结果的框架。

        public static ExecutorService newWorkStealingPool() {
            return new ForkJoinPool
                (Runtime.getRuntime().availableProcessors(),
                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                 null, true);
        }

    可以看出:

    newFixedThreadPool(int nThreads)、newCachedThreadPool()、newSingleThreadExcutor()使用的是ThreadPoolExecutor类来创建线程;

    newSingleThreadScheduledExcutor()与newScheduledThreadPool(int corePoolSize)使用的是ScheduledThreadPoolExecutor类;

    newWordStealingPool()使用的是ForkJoinPool类。

    运行一个线程池:

    import lombok.SneakyThrows;
    
    import java.util.concurrent.*;
    
    public class ThreadPoolTest {
        @SneakyThrows
        public static void main(String[] args){
            Executor executor = Executors.newFixedThreadPool(3);
    
            executor.execute(new MyRunnable());
        }
    
        static class MyRunnable implements Runnable{
    
            @SneakyThrows
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":is start...");
                Thread.sleep(300);
                System.out.println(Thread.currentThread().getName()+":is over...");
            }
        }
    
    }

    二、通过ThreadPoolExecutor构造一个线程池需要用到的参数。

    1、corePoolSize:核心线程数,长久存在
    2、maximumPoolSiz:最大线程数
    3、workQueue:任务等待队列,java默认的等待队列没有限制,会无限存放,容易造成内存泄漏。
    4、keepLiveTime:空闲线程(大于corePoolSize数的线程)的存在时间
    5、TimeUnit:一个时间类型的枚举类。有从纳秒到天的时间量度,配合上面的keepAliveTime确定非核心线程的存活时间。
    6、threadFactory:线程工厂,创建新线程,Executors.defaultThreadFactory()
    7、RejectedExecutionHandler:线程池饱和策略,有以下4种
    AbortPolicy:直接抛弃异常,这是默认策略
    CallerRunsPolicy:用调用者所在的线程来执行任务
    DiscardOldestPolicy:丢弃队列中靠最前的任务,并执行当前任务
    DiscardPolicy:直接丢弃任务
    实现RejectedExecutionHandlerjiekou的自定义handler。

    自定义一个线程池:

    import lombok.SneakyThrows;
    
    import java.util.concurrent.*;
    
    public class ThreadPoolTest {
        @SneakyThrows
        public static void main(String[] args){
            int corePoolSize = 3;//核心线程数
            int maximumPoolSize = 10;//最大线程数
            int keepAliveTime = 2;//空闲线程存活时间
            TimeUnit unit = TimeUnit.MILLISECONDS;//keepAliveTime时间单位
            BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();//等待队列
            ThreadFactory threadFactory = Executors.defaultThreadFactory();//线程工厂,用于创建线程
            RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();//线程池饱和策略
    
            ExecutorService executor = new ThreadPoolExecutor(
                    corePoolSize,
                    maximumPoolSize,
                    keepAliveTime,
                    unit,
                    queue,
                    threadFactory,
                    handler
                    );
    
            executor.execute(new MyRunnable());
    
    
        }
    
        static class MyRunnable implements Runnable{
    
            @SneakyThrows
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":is start...");
                Thread.sleep(300);
                System.out.println(Thread.currentThread().getName()+":is over...");
            }
        }
    
    }

    结果:

    pool-1-thread-1:is start...
    pool-1-thread-1:is over...

    三、当线程池有新任务提交execute执行时。

    1、如果线程池中的线程少于corePoolSize,则创建新的线程来处理任务,即使线程池中的其他线程是空闲的。
    2、如果线程池中的的线程大于corePoolSize且小于maximunPoolSize,则只用当workQueue满时才创建新的线程去处理任务。
    3、如果设置的corePoolSize和maximumPoolSize相同,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去workQueue中取任务。
    4、如果运行的线程数量大于等于maximumPoolSize,这时如果workQueue已经满了,则通过handler所指定的策略来处理任务。

    四、线程池的状态

    running:能接受新提交的任务,并且也能处理阻队列中的任务
    shutdown:等待线程任务完成,不再接受新提交的任务,也不处理队列中的任务
    stop:不等待线程任务完成,不再接受新提交的任务,也不处理队列中的任务
    tidying:所有的任务都已终止
    terminated:terminated()方法执行完后进入的状态

    五、ExecutorService中的方法

    1、submit,可以接收返回值

    <T> Future<T> submit(Callable<T> task);

    需要先实现Callable接口,Callable是个函数式接口,类似于Runnable,但是它的call方法需要设定返回值类型。

    submit传回一个Future对象,调用它的get方法可以获得返回值。

    结果:

    2、关闭线程池

    shutdown方法,等待所有线程执行完毕后,不再接受新提交的任务,也不处理队列中的任务,然后关闭线程池。

    shutdownNow,立即关闭线程池,不等待线程执行完毕。

    import lombok.SneakyThrows;
    
    import java.util.concurrent.*;
    
    public class ThreadPoolTest {
        @SneakyThrows
        public static void main(String[] args){
            int corePoolSize = 3;//核心线程数
            int maximumPoolSize = 10;//最大线程数
            int keepAliveTime = 2;//空闲线程存活时间
            TimeUnit unit = TimeUnit.MILLISECONDS;//keepAliveTime时间单位
            BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();//等待队列
            ThreadFactory threadFactory = Executors.defaultThreadFactory();//线程工厂,用于创建线程
            RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();//线程池饱和策略
    
            ExecutorService executor = new ThreadPoolExecutor(
                    corePoolSize,
                    maximumPoolSize,
                    keepAliveTime,
                    unit,
                    queue,
                    threadFactory,
                    handler
                    );
    
    
           executor.execute(new MyRunnable());
    //       executor.shutdown();
    //       executor.shutdownNow();
    
    
        }
        static class MyRunnable implements Runnable{
    
            @SneakyThrows
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":is start...");
                Thread.sleep(300);
                System.out.println(Thread.currentThread().getName()+":is over...");
            }
        }
    
    }

    使用shutdown:

    pool-1-thread-1:is start...
    pool-1-thread-1:is over...

    使用shutdownNow:

    pool-1-thread-1:is start...
    Exception in thread "pool-1-thread-1" java.lang.Error: java.lang.InterruptedException: sleep interrupted
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.InterruptedException: sleep interrupted
        at java.lang.Thread.sleep(Native Method)
        at com.imooc.basic.thread.ThreadPoolTest$MyRunnable.run(ThreadPoolTest.java:41)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        ... 2 more

    之所以会产生异常,因为shutdownNow方法会调用了线程的Interrupted方法,这里没有处理异常而是直接抛出。但是从结果可以看出线程没有over,已被中断。

    import lombok.SneakyThrows;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    
    public class ThreadPoolTest {
        public static void main(String[] args){
            Executor executor = Executors.newFixedThreadPool(3);
    
            executor.execute(new ThreadTest());
            
    
        }
    
        static class ThreadTest implements Runnable{
    
            @SneakyThrows
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":is start...");
                Thread.sleep(300);
                System.out.println(Thread.currentThread().getName()+":is over...");
            }
        }
    }
    就算这个世道烂成一堆粪坑,那也不是你吃屎的理由
  • 相关阅读:
    *args和**kwargs
    事件驱动模型
    同步异步和阻塞非阻塞
    多进程和多线程
    认识tornado(五)
    认识tornado(四)
    认识tornado(三)
    [GO]使用select实现超时
    [GO]使用select实现斐波那契
    [GO]ticker的使用
  • 原文地址:https://www.cnblogs.com/whalesea/p/13138847.html
Copyright © 2011-2022 走看看