zoukankan      html  css  js  c++  java
  • Java线程池及其底层源码实现分析

    1、相关类

      Executors  ExecutorService   Callable   ThreadPool     Future

    2、相关接口

      Executor

    Executor接口的使用:

      

    public class TestExecutor implements Executor{
        @Override
        public void execute(Runnable command){
            //调用execute方法常常传入runnable接口对象,开启线程
        }
    
    }
    

      ExecutorService接口的使用:(继承Executor接口)

    /**
    *submit方法(执行runnble、callable对象的线程)
    *实现类:各种线程池
    */
    Callable接口 && Runnable接口
    callable调用call方法
    runnable调用run方法
    都可以被线程调用,但callable的call方法具有返回值(泛型)
    Executors类(操作Executor的工具类)
    ExecutorService service = Executors.newFixedThreadPool(5);//创建5个线程的线程池
    ThreadPool线程池类(装着线程的容器)
    线程池创建的固定线程,线程任务执行完后线程不会消失,处于等待任务的状态(idel)。
        线程任务大于线程池容量时,多出来的任务放在等待队列中(内部使用BlockingQueue实现)
    public class TestThreadPool{
    2
        public static void main(String[] args){
    3
            ExecutorService service = Executors.newFixedThreadPool(5);//1.创建5个线程的线程池容器
    4
            
    5
            for(int i=0;i<6;i++){//2.放6个任务,线程池一次只能放5个,所以第6个任务需要重复使用旧的线程
    6
                service.execute(() -> {
    7
                    System.out.println(Thread.getCurrentThread().getName());//3.打印出当前线程名
    8
                });
    9
            }
    10
            service.shutdown();//执行完当前任务则关闭线程
    11
            service.shutdownNow();//无论是否执行完都关闭线程
    12
        }
    13
    }
    

      Future接口(线程未来产生的返回值)

    public class TestFuture{
    2
        public static void main(String[] args){
    3
            //FutureTask实现类Runnable和Future接口
    4
            FutureTask<Integer> task = new FutureTask<>(
    5
                Thread.sleep(500);//阻塞等待500毫秒
    6
                return 1000;
    7
            );
    8
            
    9
            //new的方式启动线程任务
    10
            new Thread(task).start();
    11
            System.out.println(task.get());//阻塞等待500毫秒后得到返回值
    12
            ///////////////////////////////////////////////////////////////////
    13
            ExecutorService service = Executors.newFixedThreadPool(5);//创建5个线程的线程池
    14
            Future<Integer> future = service.submit(()->{//相当于运行类callable接口的call方法,返回1
    15
                Thread.sleep(500);
    16
                return 1;
    17
            });
    18
            System.out.println(future.get());//阻塞等待500毫秒后得到返回值
    19
        }
    20
    }
    

      WorkStealingPool偷任务线程池

      底层采用ForkJoinPool实现(开启的是Deamon守护线程,主线程退出则线程退出)

      

    public class WorkStealingPoolTest {
    2
        public static void main(String[] args) throws IOException {
    3
            //根据CPU核数启动相应个数的线程(4核cpu---4个线程)
    4
            ExecutorService service = Executors.newWorkStealingPool();
    5
            System.out.println(Runtime.getRuntime().availableProcessors());
    6
    7
            service.execute(new R(1000));//线程1执行任务1----1秒
    8
            service.execute(new R(2000));//线程2执行任务2----2秒
    9
            service.execute(new R(2000));//线程3执行任务3----2秒
    10
            service.execute(new R(2000));//线程4执行任务4----2秒
    11
            service.execute(new R(2000));//任务5阻塞,当线程1执行完后把任务5偷过来执行
    12
            
    13
            //由于产生的是守护线程,主线程不阻塞的话,看不到输出
    14
            System.in.read();//将主线程阻塞 
    15
        }
    16
    17
        static class R implements Runnable {
    18
    19
            int time;
    20
    21
            R(int t) {
    22
                this.time = t;
    23
            }
    24
    25
            @Override
    26
            public void run() {
    27
                
    28
                try {
    29
                    TimeUnit.MILLISECONDS.sleep(time);
    30
                } catch (InterruptedException e) {
    31
                    e.printStackTrace();
    32
                }
    33
                //打印线程名---ForkJoinPool
    34
                System.out.println(time  + " " + Thread.currentThread().getName());
    35
                
    36
            }
    37
    38
        }
    39
    }
    

      ForkJoinPool(分支合并线程池)

      思想:分治,把大任务拆分成小任务并行计算,计算完成后将结果合并

      守护线程

      

    public class ForkJoinPoolTest{
    2
            
    3
            public static void main(String[] args) throws Exception {
    4
            ForkJoinPool pool = new ForkJoinPool();
    5
            MyTask task = new MyTask(inits, 0, inits.;ength-1);
    6
            ForkJoinTask<int[]> taskResult = pool.submit(task);
    7
            try {
    8
                taskResult.get();//阻塞等待所有线程结果计算完成
    9
            } catch (InterruptedException | ExecutionException e) {
    10
                e.printStackTrace(System.out);
    11
            }
    12
        }
    13
    14
        /**
    15
         * 单个排序的子任务
    16
         */
    17
        static class MyTask extends RecursiveTask<int[]> {
    18
    19
            private int[] source;
    20
            private int start;
    21
            private int end;
    22
    23
            public MyTask(int[] source,int start, int end ) {
    24
                this.source = source;
    25
                this.start = start;
    26
                this.end = end;
    27
            }
    28
    29
    30
            @Override
    31
            protected int[] compute() {
    32
                //长度小于50,进行计算
    33
                if(source.length <= 50) {
    34
                    long sum = 0L;
    35
                    for(int i=start; i<end; i++) sum += nums[i];
    36
                    return sum;
    37
                } 
    38
                //长度大于50,继续划分子任务
    39
                int middle = start + (end-start)/2;
    40
                
    41
                AddTask subTask1 = new MyTask(source,start,middle);
    42
                AddTask subTask2 = new MyTask(source,middle,end);
    43
                subTask1.fork();//递归创建子任务线程
    44
                subTask2.fork();
    45
                
    46
                //计算完成后将两个子任务的结果合并
    47
                return subTask1.join() + subTask2.join();
    48
            }
    49
        }
    50
    }
    

      各种线程池的底层实现:

      一、基本线程池:

      FixedThreadPool
           CachedThreadPool
           ScheduledThreadPool
           SingleThreadPool
      二、底层创建线程池都是使用ThreadPoolExecutor类实现的,而放置任务、执行任务使用了生产者消费者模型(阻塞队列的方式)
      三、源码分析
      ThreadPoolExecutor的API:
      
    ThreadPoolExecutor(int corePoolSize,//核心线程数(最小)
    
                       int maximumPoolSize,//最大线程数
    
                       long keepAliveTime, //线程运行时间
    
                       TimeUnit unit, //时间单位
    
                       BlockingQueue<Runnable> workQueue)//底层采用哪种阻塞队列来放线程任务 
    

      各种线程池的底层实现:

      

    //FixedThreadPool
    public static ExecutorService newFixedThreadPool(int nThreads){
         return new ThreadPoolExecutor(nThreads,//初始线程数自定义
                        nThreads,//最大线程数自定义
                        0L, TimeUnit.SECONDS,//一旦启动线程池,线程永远不消失
                        new LinkedBlockingQueue<Runnable>());//链表阻塞队列
        }
    

      

    //CachedThreadPool(采用同步阻塞队列装任务,队列中有任务则启动新线程执行,没任务就阻塞)
    public static ExecutorService newCachedThreadPool(){
                        return new ThreadPoolExecutor(0,//初始为0个线程
                        Integer.MAX_VALUE,//可以启动无限多线程
                        60L, TimeUnit.SECONDS,//60秒空闲则结束
                        new SynchronousQueue<Runnable>());//同步阻塞队列,有任务马上开新线程执行(容量用于为0)
    }
    

      

    //SingleThreadPool
        return new ThreadPoolExecutor(1,//初始线程数为1
                    1,//最大线程数为1
                     0L, TimeUnit.SECONDS,//一旦启动线程池,线程永远不消失
                     new LinkedBlockingQueue<Runnable>());//链表阻塞队列
    }
    

      

    //ScheduledThreadPool
        public newScheduledThreadPool(int corePoolSize){
                        super(corePoolSize,//初始线程数自定义
                        Integer.MAX_VALUE,//无限多线程数
                        0, NANOSECONDS,//一旦启动线程池,线程永远不消失
                        new DelayedWorkQueue<Runnable>());//延时阻塞队列,隔一段时间执行一次任务
    }
    

      

      
     
  • 相关阅读:
    python中的 ' ' 和 " "
    Socket编程, 在server端read()函数调用后显示错误:Transport endpoint is not connected (犯了低级错误)
    我也终于申请了自己的博客。
    山之高
    策略模式-Java实现
    SQL事务的四种隔离级别和MySQL多版本并发控制
    用JAMES实现自己的邮件服务器
    windows mysql 自动备份的几种方法
    Hibernate常见配置详细解释
    慕课网-Linux达人养成计划学习笔记
  • 原文地址:https://www.cnblogs.com/hanxue112253/p/9626839.html
Copyright © 2011-2022 走看看