zoukankan      html  css  js  c++  java
  • Java高并发编程(四)

      一、Executor执行器

      1.Executor接口,java线程池框架中的顶层接口,提供一个execute方法来执行任务

    import java.util.concurrent.Executor;
    
    public class T01_MyExecutor implements Executor {
    
        public static void main(String[] args) {
            new T01_MyExecutor().execute(()->System.out.println("hello executor"));
        }
    
        @Override
        public void execute(Runnable command) {
            //new Thread(command).run();//另起线程方法调用
            command.run();//方法直接调用
        }
    
    }

      2.任务接口

        1)callable接口:提供一个call方法,具有返回值可以抛出异常

        2)runnable接口:提供一个run方法,无返回值不可抛出异常

      3.ExecutorService接口(Executor的一个子接口)

        1)ExecutorService可以理解为服务,执行execute方法可以向服务器中添加runnable任务无返回值,而执行submit方法可以向服务中添加callable与runnable任务,且有返回值

        2)Executor与ExecutorService的简单理解:可以想象许多的执行器Executor在等待着执行任务,而service服务可通过调用execute或submit方法来添加不同的任务供执行器执行

        3)submit方法具有一个future类型的返回值,与callable,Executors一起使用

    public class T06_Future {
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            
            FutureTask<Integer> task = new FutureTask<>(()->{
                TimeUnit.MILLISECONDS.sleep(500);
                return 1000;
            }); //new Callable () { Integer call();}
            
            new Thread(task).start();
            
            System.out.println(task.get()); //阻塞
            
            //*******************************
            ExecutorService service = Executors.newFixedThreadPool(5);
            Future<Integer> f = service.submit(()->{
                TimeUnit.MILLISECONDS.sleep(500);
                return 1;
            });
            System.out.println(f.get());
            System.out.println(f.isDone());
            
        }
    }

    运行结果:

      4.Executors(操作Executor的一个工具类、工厂类)

      二、ThreadPool线程池

      1.线程池将任务分配给池中线程,当线程池中线程执行不过来时,任务会进入等待队列,队列由BlockingQueue实现,当有线程空闲时再来任务时会被分配给空闲线程,一个线程同时维护着两个队列(结束队列与等待队列)

      下面一个程序帮助理解线程池的概念

    public class T05_ThreadPool {
        public static void main(String[] args) throws InterruptedException {
            ExecutorService service = Executors.newFixedThreadPool(5); //execute submit
            for (int i = 0; i < 6; i++) {
                service.execute(() -> {
                    try {
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName());
                });
            }
            System.out.println(service);
            
            service.shutdown();//正常关闭,等待所有任务执行完毕,关闭线程池
            System.out.println(service.isTerminated());//判断所有任务是否都执行完了
            System.out.println(service.isShutdown());//判断线程池是否被关闭了
            System.out.println(service);
            
            TimeUnit.SECONDS.sleep(5);
            System.out.println(service.isTerminated());
            System.out.println(service.isShutdown());
            System.out.println(service);
        }
    }

    运行结果:

      2.六种线程池

        1)FixedThreadPool  固定线程数量的线程池

        2)cachedThreadPool  假定刚开始一个线程都没有,来一个任务开启一个线程,如果再来一个任务,线程池中有空闲线程就将任务分配给空闲线程,如此往复直到起到电脑能支撑限度为止,默认一个线程空闲超过一分钟就自动销毁

        3)singleThreadPool  线程池中只有一个线程

        4)scheduledThreadPool  定时器线程池

        5)workStealingPool  工作窃取线程池,当前线程池有线程空闲,会自动去寻找任务执行,默认根据CPU核数启动默认线程数目线程,是精灵(demon)线程(守护线程、后台线程),主函数不阻塞的话看不到输出,本质上是ForkJoinPool实现的

        6)ForkJoinPool  分叉合并线程,大任务可以切分成许多的小任务,如果小任务还是太大的话还可以继续分,分的可以了就将小任务合并,最后产生一个总的结果(有点像归并排序)

           ForkJoinTask从RecursiveAction和RecursiveTask继承

          a.RecursiveAction  无返回值

    /**
     * 对数组中所有数求和
     * @author zhangqi
     *
     */
    public class T12_ForkJoinPool {
        static int[] nums = new int[1000000];
        static final int MAX_NUM = 50000;
        static Random r = new Random();
        
        static {
            for(int i=0; i<nums.length; i++) {
                nums[i] = r.nextInt(100);
            }
            
            System.out.println(Arrays.stream(nums).sum()); //stream api 
        }
        
        
        static class AddTask extends RecursiveAction { 
            
            int start, end;
            
            AddTask(int s, int e) {
                start = s;
                end = e;
            }
    
            @Override
            protected void compute() {
                
                if(end-start <= MAX_NUM) {
                    long sum = 0L;
                    for(int i=start; i<end; i++) sum += nums[i];
                    System.out.println("from:" + start + " to:" + end + " = " + sum);
                } else {
                
                    int middle = start + (end-start)/2;
                    
                    AddTask subTask1 = new AddTask(start, middle);
                    AddTask subTask2 = new AddTask(middle, end);
                    subTask1.fork();
                    subTask2.fork();
                }
                
                
            }
            
        }
        
        
        /*static class AddTask extends RecursiveTask<Long> { 
            
            int start, end;
            
            AddTask(int s, int e) {
                start = s;
                end = e;
            }
    
            @Override
            protected Long compute() {
                
                if(end-start <= MAX_NUM) {
                    long sum = 0L;
                    for(int i=start; i<end; i++) sum += nums[i];
                    return sum;
                } 
                
                int middle = start + (end-start)/2;
                
                AddTask subTask1 = new AddTask(start, middle);
                AddTask subTask2 = new AddTask(middle, end);
                subTask1.fork();
                subTask2.fork();
                
                return subTask1.join() + subTask2.join();
            }
            
        }*/
        
        public static void main(String[] args) throws IOException {
            ForkJoinPool fjp = new ForkJoinPool();
            AddTask task = new AddTask(0, nums.length);
            fjp.execute(task);
            //long result = task.join();
            //System.out.println(result);
            
            System.in.read();
            
        }
    }

    运行结果:

          b.RescursiveTask  有返回值的递归任务

    /**
     * 对数组中所有数求和
     * @author zhangqi
     *
     */
    public class T12_ForkJoinPool {
        static int[] nums = new int[1000000];
        static final int MAX_NUM = 50000;
        static Random r = new Random();
        
        static {
            for(int i=0; i<nums.length; i++) {
                nums[i] = r.nextInt(100);
            }
            
            System.out.println(Arrays.stream(nums).sum()); //stream api 
        }
        
        
    /*    static class AddTask extends RecursiveAction { 
            
            int start, end;
            
            AddTask(int s, int e) {
                start = s;
                end = e;
            }
    
            @Override
            protected void compute() {
                
                if(end-start <= MAX_NUM) {
                    long sum = 0L;
                    for(int i=start; i<end; i++) sum += nums[i];
                    System.out.println("from:" + start + " to:" + end + " = " + sum);
                } else {
                
                    int middle = start + (end-start)/2;
                    
                    AddTask subTask1 = new AddTask(start, middle);
                    AddTask subTask2 = new AddTask(middle, end);
                    subTask1.fork();
                    subTask2.fork();
                }
                
                
            }
            
        }*/
        
        
        static class AddTask extends RecursiveTask<Long> { 
            
            int start, end;
            
            AddTask(int s, int e) {
                start = s;
                end = e;
            }
    
            @Override
            protected Long compute() {
                
                if(end-start <= MAX_NUM) {
                    long sum = 0L;
                    for(int i=start; i<end; i++) sum += nums[i];
                    return sum;
                } 
                
                int middle = start + (end-start)/2;
                
                AddTask subTask1 = new AddTask(start, middle);
                AddTask subTask2 = new AddTask(middle, end);
                subTask1.fork();
                subTask2.fork();
                
                return subTask1.join() + subTask2.join();
            }
            
        }
        
        public static void main(String[] args) throws IOException {
            ForkJoinPool fjp = new ForkJoinPool();
            AddTask task = new AddTask(0, nums.length);
            fjp.execute(task);
            long result = task.join();
            System.out.println(result);
            
            //System.in.read();
            
        }
    }

    运行结果:

      补充:

      ThreadPoolExecutor线程池执行器(自定义线程池,许多线程池的底层实现,ForkJoinPool不是由它实现)

  • 相关阅读:
    4、Java基本数据类型
    3、Java 对象和类
    2、Java 基础语法标识符、修饰符、变量、 数组、枚举、关键字
    1、Java 开发环境配置
    近期目标
    Java泛型是什么?实战demo
    Java高级篇XML和正则表达式
    Java高级篇反射和注解
    Java高级篇 JVM
    JavaScript执行顺序
  • 原文地址:https://www.cnblogs.com/ghoster/p/7485674.html
Copyright © 2011-2022 走看看