zoukankan      html  css  js  c++  java
  • Java线程池Executor&ThreadPool

      java自1.5版本之后,提供线程池,供开发人员快捷方便的创建自己的多线程任务。下面简单的线程池的方法及说明。

      1、Executor 

        线程池的顶级接口。定义了方法execute(Runnable),该方法接收一个Runnable实例,用来执行一个任务,该任务即是一个实现Runnable接口的类。

        此服务方法无返回值,原因是因为实现Runnable接口的类的run方法是无返回(void)的。

        常用方法 : void execute(execute)

        作用 : 启动并执行线程任务

      2、ExecutorService

        继承自Executor接口,提供了更多的方法调用,例如优雅关闭方法shutdown,有返回值的submit。

        2.1、ExecutorService生命周期

          运行 - Running 、关闭 - shuttingdown、终止 - terminated

          Running : 线程池正在执行中,活动状态。创建后即进入此状态

          shuttingdown : 优雅关闭,线程池正在关闭中。不再接收新的线程任务,已有的任务(正在处理的 + 队列中阻塞的),处理完毕后,关闭线程池。

                  调用shutdown()方法,即进入此状态

          terminated : 线程池已关闭。

        2.2、submit方法

          有返回值,Future类型。重载了方法,submit(Runnable)不需要提供返回值。submit(Callable)、submit(Runnable,T)可以提供线程执行后的结果返回值。

        2.3、Future

          线程执行完毕结果。获取线程执行结果是通过get()方法获取。get()无参,阻塞等待线程执行结束。

          get(long timeout, TimeUnit unit)有参,阻塞等待固定时长,超时未获取,则抛出异常。

        2.4、Callable

          类似Runnable的一个线程接口。其中的对应run的方法是call方法。此接口提供了线程执行完毕返回值。

    package com.cn.cfang.executor;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class Test {
        public static void main(String[] args) throws Exception{
            ExecutorService executorService = Executors.newFixedThreadPool(1);
            Data data = new Data();
    //        Future<Data> future = executorService.submit(new Task(data), data); //runnable
            Future<Data> future = executorService.submit(new Task1(data)); //callable
            System.out.println(future.get().getName());
            executorService.shutdown();
        }
    }
    
    class Data {
        String name;
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
    }
    
    class Task implements Runnable{
        Data data;
        public Task(Data data) {
            this.data = data;
        }
        @Override
        public void run() {
             data.setName("hello world");
        }
    }
    
    class Task1 implements Callable<Data>{
        Data data;
        public Task1(Data data) {
            this.data = data;
        }
        @Override
        public Data call() throws Exception {
            data.setName("hello world");
            return data;
        }
        
    }

       3、Executors工具类

        提供了很多的工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。

    线程池属于进程级的重量级资源,默认的生命周期同JVM一致,当开启线程池后,直到jvm关闭,是线程池的默认的生命周期。
    如果手动调用shutdown方法,可优雅关闭线程池,在当前所有任务执行结束后,关闭线程池。

      4、几种常用的线程池

        4.1、FixedThreadPool

         容量固定的线程池。使用LinkedBlockingQueue作为任务队列,当任务数量大于线程池容量的时候,未执行的任务进入任务等待队列LinkedBlockingQueue中,

         当线程有空闲的时候,自动从队列中取出任务执行。

            使用场景: 大多数情况下,推荐使用的线程池。因为os系统和硬件是有线程上限限制的,不可能去无限的提供线程池操作。

        4.2、CachedThreadPool

          缓存线程池。容量 0-Integer.MAX_VALUE,自动根据任务数扩容:如果线程池中的线程数不满足任务执行需求,则创建新的线程并添加到池中。

          生命周期默认60s,当线程空闲时长到60s的时候,自动终止销毁释放线程,移除线程池。

          使用场景 : 可用于测试最高负载量,用于对FixedThreadPool容量的参考。

          注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT(默认60s)不活动,其会自动被终止。 

        4.3、ScheduledThreadPool

          定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

    public static void main(String[] args) {
            ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
            System.out.println(service);
            
            // 定时完成任务。 scheduleAtFixedRate(Runnable, start_limit, limit, timeunit)
            // runnable - 要执行的任务。
         // start_limit - 第一次执行任务的时间间隔
         // limit - 多次任务执行的时间间隔
    // timeunit - 时间单位 service.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); } }, 0, 300, TimeUnit.MILLISECONDS); }

        4.4、SingleThreadExceutor 单一容量线程池。

        4.5、自定义线程池

          自定义线程池,可以使用ThreadPoolExecutor类来进行创建管理。线程池中,除了ForkJoinPool外,其他常用的线程池底层,都是使用ThreadPoolExecutor实现的。

          参数说明:

            corePoolSize:核心线程数,也是最少线程数。在创建线程池时,默认情况下,是不会创建线程池的,也即此时的线程池中线程数为0,直到有任务来临时,才会去创建线程。当然,手动调用prestartCoreThread()或者prestartAllCoreThreads()方法,可以初始化创建线程池中的线程。默认情况下,当有任务来临时,就会创建新的线程去处理执行,即使此时线程池中有空闲的线程。当线程数达到corePoolSize时,线程数不增加,此时任务会放入等待队列BlockingQueue中。

            workQueue:阻塞队列,用来存储等待执行的任务资源。

              maximumPoolSize:最大线程数。当阻塞队列满了,开始扩充线程池中的线程数。直到达到此最大值的时候。

            handler:当线程池中的线程数等于maximumPoolSize的时候,此时再来任务的话,交由此拒绝策略执行。

            keepAliveTime:表示的线程在空闲多长时间后会被终止。默认是在线程数大于corePoolSize才生效,也可以手动设置allowCoreThreadTimeOut()方法让线程数在不大于  corePoolSize也生效。

     public ThreadPoolExecutor(
                 int corePoolSize, //核心容量,创建线程池的时候,默认有多少的线程数。也是最少线程数
                       int maximumPoolSize, //最大线程数
                       long keepAliveTime,  //线程生命周期,0为永久。当线程空闲多长时间,自动回收。
                       TimeUnit unit,  //生命周期时间单位。
                      BlockingQueue<Runnable> workQueue,  //任务阻塞队列。
               RejectedExecutionHandler handler
          ) {     
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);       }

          简单例子:

    package com.cn.cfang.executor;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Test2 {
    
        public static void main(String[] args){ 
            //创建等待队列 
            BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20); 
            //创建线程池,池中保存的线程数为3,允许的最大线程数为5
            ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue); 
            //创建七个任务 
            Runnable t1 = new MyThread(); 
            Runnable t2 = new MyThread(); 
            Runnable t3 = new MyThread(); 
            Runnable t4 = new MyThread(); 
            Runnable t5 = new MyThread(); 
            Runnable t6 = new MyThread(); 
            Runnable t7 = new MyThread(); 
            //每个任务会在一个线程上执行
            pool.execute(t1); 
            pool.execute(t2); 
            pool.execute(t3); 
            pool.execute(t4); 
            pool.execute(t5); 
            pool.execute(t6); 
            pool.execute(t7); 
            //关闭线程池 
            pool.shutdown(); 
        } 
    }
    
    class MyThread implements Runnable{ 
        @Override
        public void run(){ 
            System.out.println(Thread.currentThread().getName() + "正在执行。。。"); 
            try{ 
                Thread.sleep(100); 
            }catch(InterruptedException e){ 
                e.printStackTrace(); 
            } 
        } 
    }

     5、forkjoin框架

        拆分合并,将一个大的任务,拆分成若干子任务,并最终汇总子任务的执行结果,得到大任务的执行结果。并行执行,采用工作窃取机制,更加有效的利用cpu资源。

        5.1、主要类

          ForkJoinPool : 用于执行Task。任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。

                  当一个工作线程的队列里暂时没有任务时,它会随机从其他未完成工作线程的队列的尾部获取一个任务。

          ForkJoinTask:ForkJoin任务,提供在任务中执行fork()和join()操作的机制(二叉分逻辑),通常不直接继承ForkJoinTask类,

                 而是继承抽象子类RecursiveTask(有返回结果) 或者 RecursiveAction (无返回结果)。

          ForkJoinWorkerThread:ForkJoinPool 内部的worker thread,用来具体执行ForkJoinTask。内部有 ForkJoinPool.WorkQueue,来保存要执行的 ForkJoinTask。

          ForkJoinPool.WorkQueue:保存要执行的ForkJoinTask。

       5.2、工作窃取机制

          1、大任务分割成N个子任务,为避免线程竞争,于是分开几个队列去保存这些子任务,并为每个队列提供一个工作线程去处理其中的任务。工作线程与任务队列一一对应。

          2、如果A线程执行完自己队列中的所有任务,如果此时其他队列中还有未执行的任务,则A线程会去窃取一个其他队列的任务来执行。但是,此时两个线程同时访问,

            可能会产生竞争问题,所以,任务队列设计成了双向队列。A线程窃取的时候,从另一端开始执行,尽可能的去避免线程竞争问题。

          3、工作窃取机制,充分的利用线程资源,并尽可能的去避免线程间的竞争问题。但是,只能是尽可能避免,并不能规避。例如,双向队列只有一个任务。

         5.3、简单使用

          例:求和 0 - 10000000000L。

    package com.cn.cfang.executor;
    
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.Future;
    import java.util.concurrent.RecursiveTask;
    
    /**
     * 步骤:
     *  1、建立任务类Task,继承RecursiveTask或者RecursiveAction。需要返回值则选用RecursiveTask,无需返回值选用RecursiveAction
     *  2、任务类Task,满足一定的阈值,则对子任务进行计算,不满足,则二叉分后,递归调用自身
     *  3、调用中,新建ForkJoinPool对象,新建任务类对象Task,将任务类对象Task放入ForkJoinPool中执行。
     *     如果需要返回值,则可以invoke或者Future-submit。
     * @author cfang
     * 2018年5月15日 上午10:51:03
     */
    public class Test3 {
        
        public static void main(String[] args) throws Exception{
            ForkJoinPool pool = new ForkJoinPool();
            ForkJoinWorkTask task = new ForkJoinWorkTask(0l, 10000000l);
    //        Long result = pool.invoke(task);
    //        System.out.println(result);
            Future<Long> future = pool.submit(task);
            System.out.println(future.get());
        }
        
    }
    
    class ForkJoinWorkTask extends RecursiveTask<Long>{
    
        private static final long serialVersionUID = 1L;
        
        private Long start;    //起始
        private Long end;    //终止
        private static final Long THRESHOLD = 10000L; //子任务分割阈值
        
        public ForkJoinWorkTask(Long start, Long end){
            this.start = start;
            this.end = end;
        }
        
        @Override
        protected Long compute() {
            Long sum = 0l;
            if(end - start <= THRESHOLD){ //足够小的子任务,进行计算求和
                for(Long i = start; i < end; i++){
                    sum += i;
                }
            }else{ //任务拆分不满足,继续拆分(二叉分逻辑)
                Long middle = (start + end) / 2;
                ForkJoinWorkTask rightTask = new ForkJoinWorkTask(start, middle);
                rightTask.fork();
                ForkJoinWorkTask leftTask = new ForkJoinWorkTask(middle + 1, end);
                leftTask.fork();
                sum = rightTask.join() + leftTask.join();
            }
            return sum;
        }
        
    }
  • 相关阅读:
    is(C# 参考)
    索引器(C# 编程指南)
    修改IIS文件上传大小限制
    Sql Server判断某列字段是否为空或判断某列字段长度
    Linq分页查询
    H5网页播放器播不了服务器上的mp4视频文件
    [你必须知道的.NET] 第八回:品味类型---值类型与引用类型(上)-内存有理
    如何通过ildasm/ilasm修改assembly的IL代码
    Python实战之set学习笔记及简单练习
    Python实战之int学习笔记及简单练习
  • 原文地址:https://www.cnblogs.com/eric-fang/p/9004020.html
Copyright © 2011-2022 走看看