zoukankan      html  css  js  c++  java
  • java并发编程基础——线程池

    线程池

    由于启动一个线程要与操作系统交互,所以系统启动一个新的线程的成本是比较高的。在这种情况下,使用线程池可以很好的提升性能,特别是程序中涉及创建大量生命周期很短暂的线程时。

    与数据库连接池类似,线程池在启动时就创建了大量的空闲的线程,程序将一个Runnable对象或者Callable对象传给线程池,线程池就会启动一个线程来执行他们的run()或call()方法,当方法执行结束后,线程并不会死亡,而是再次返回线程池中成为空闲状态,等待下一次执行。

    线程池还可以有效的控制系统中并发线程的数量,当系统中包含大量并发线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃,而线程池中的最大线程参数可以控制系统中并发线程数不超过此数。

    一、Executors工厂生成线程池

    在java5开始,增加了一个Executors工厂类来生产线程池,它包含如下几个静态方法来生产线程池:

    newCacheThreadPool():创建一个具有缓存功能的线程池,系统根据需要创建线程,这些线程将会别缓存在线程池中。

    newFixedThreadPool(int nThread):创建一个可重用的、具有固定线程数的线程池。

    newSingleThreadExecutor():创建一个只有单线程的线程池,相当与newFixedThreadPool(1)。

    newScheduledThreadPool(int corePoolSize):创建具有指定线程数的线程池,它可以在延迟后执行线程任务。

    newSigleScheduleExecutor():创建只有一个线程的线程池,它可以在指定延迟后执行线程任务。

    上面方法中,前3个返回ExecutorService对象线程池。后面两个返回ScheduleExecutorService对象线程池。ScheduleExecutorService是ExecutorService的子类,可以延迟执行线程。

    线程池小例子:

    package threadtest;
     
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
     
    public class ThreadTest implements Runnable  {
     
        private static int i = 0;
        private synchronized void incre() {
            i++;
            System.out.println(Thread.currentThread().getName() +" "+ i);
        }
        @Override
        public void run() {
     
            for(int j=0;j<5;j++) {
                incre();
            }
        }
        public static void main(String[] args) {
     
            //生产一个线程池
            ExecutorService es = Executors.newFixedThreadPool(6);
            ThreadTest tt = new ThreadTest();
            es.submit(tt);
            es.submit(tt);
            es.submit(tt);
            es.shutdown();
        }
    }

    结果:

    pool-1-thread-1 1
    pool-1-thread-1 2
    pool-1-thread-1 3
    pool-1-thread-1 4
    pool-1-thread-1 5
    pool-1-thread-2 6
    pool-1-thread-2 7
    pool-1-thread-2 8
    pool-1-thread-2 9
    pool-1-thread-2 10
    pool-1-thread-3 11
    pool-1-thread-3 12
    pool-1-thread-3 13
    pool-1-thread-3 14
    pool-1-thread-3 15

    二、ForkJoinPool

    java7提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的计算结果合并成总的计算结果。(跟进多核cpu时代)

    ForkJoinPool是ExecutorService接口的实现类,所以它是一种特殊的线程池

    ForkJoinPool有两个常用的构造器:

    ForkJoinPool(int parallelism):创建包含parallelism个并行线程的ForkJoinPool

    ForkJoinPool():以Runtime.availableProcessors()方法的返回值做为parallelism参数来创建ForkJoinPool。

    ForkJoinPool创建后就可以调用它的submit(ForkJoinTask task)或invoke(ForkJoinTask task)方法来执行指定任务了。其中ForkJoinTask代表一个可以并行、合并的任务。

    ForkJoinTask是一个抽象类,它还有两个抽象子类:RecursiveAction和RecursiveTask。其中RecursiveTask代表有返回值的任务,RecursiveAction代表无返回值的任务。

    下面程序利用ForkJoinPool执行RecursiveAction任务,打印一段数字

    package threadtest;
     
    import java.util.concurrent.RecursiveAction;
     
    /**
     * 继承RecursiveAction实现可分解的任务
     * @author rdb
     *
     */
    public class PrintTask extends RecursiveAction{
     
        //每个任务对多打印50个数
        private static final int THRESHOLD = 50;
        private int start;
        private int end;
        public PrintTask(int start,int end) {
            this.start = start;
            this.end = end ;
        }
        @Override
        protected void compute() {
            //打印数小于50开始打印,否则分解任务
            if(end - start < THRESHOLD) {
                for(int i = start;i<end;i++) {
                    System.out.println(Thread.currentThread().getName()+" "+ i);
                }
            }else {
                int mind = (end + start)/2;
                PrintTask left = new PrintTask(start, mind);
                PrintTask right = new PrintTask(mind, end);
                //分解任务
                left.fork();
                right.fork();
                 
            }
        }
    }
     
    package threadtest;
     
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.TimeUnit;
     
    public class ThreadTest {
     
    public static void main(String[] args) throws InterruptedException {
             
            //构建ForkJoinPool
            ForkJoinPool pool = new ForkJoinPool();
            //提交任务
            pool.submit(new PrintTask(0, 300));
            pool.shutdown();
             
            //阻塞主线程,直到线程池关闭
            while(!pool.awaitTermination(2, TimeUnit.SECONDS)) {
                System.out.println("service not stop");
            }
            System.out.println("all thread complete");
        }
     
    }

    结果:从结果中可以看出,有个四个核在并行执行任务(电脑是4核处理器)

    ForkJoinPool-1-worker-1 262
    ForkJoinPool-1-worker-0 37
    ForkJoinPool-1-worker-3 187
    ForkJoinPool-1-worker-3 188
    ForkJoinPool-1-worker-3 189
    ForkJoinPool-1-worker-3 190
    ForkJoinPool-1-worker-3 191
    ForkJoinPool-1-worker-2 112
    ForkJoinPool-1-worker-2 113
    ForkJoinPool-1-worker-3 192
    ...
    ForkJoinPool-1-worker-0 259
    ForkJoinPool-1-worker-0 260
    ForkJoinPool-1-worker-1 296
    ForkJoinPool-1-worker-0 261
    ForkJoinPool-1-worker-1 297
    ForkJoinPool-1-worker-1 298
    ForkJoinPool-1-worker-1 299
    all thread complete

    下面程序利用ForkJoinPool执行RecursiveTask任务,求数组的和并返回

    package threadtest;
     
    import java.util.concurrent.RecursiveTask;
    /**
     * 继承RecursiveTask实现可分解的有返回值的任务
     * @author rdb
     *
     */
    public class SumTask extends RecursiveTask<Integer>{
     
        //定义每个任务最多求和10个元素
            private final int THRESHOLD = 10;
            private int[] array;
            private int start;
            private int end;
            public SumTask(int[] array,int start,int end) {
                this.array = array;
                this.start = start;
                this.end = end;
            }
            @Override
            protected Integer compute() {
                 int sum = 0;
                if((end - start) < THRESHOLD) {
                    for(int i = start;i<end;i++) {
                        sum += array[i];
                    }
                    return sum;
                }else {
                    int mind = (end + start)/2 ;
                    SumTask left = new SumTask(array, start, mind);
                    SumTask right = new SumTask(array, mind, end);
                    left.fork();
                    right.fork();
                    return left.join()+right.join();
                }
            }
     
    }
     
    package threadtest;
      
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.Future;
      
    public class ThreadTest {
      
        public static void main(String[] args) throws Exception {
            int[] array = new int[100] ;
            int total = 0;
             
            for(int i = 0;i<array.length;i++) {
                array[i] = (int) (Math.random() * 10);
                total += array[i];
            }
            System.out.println(total);
            //JAVA8新加的通用池
            ForkJoinPool pool = ForkJoinPool.commonPool();
            Future<Integer> future = pool.submit(new SumTask(array, 0, array.length));
            System.out.println(future.get());
        }     
    }

    结果

    437
    437
  • 相关阅读:
    Ubuntu18.04 环境下 解决VScode中空格长度减小的问题
    IPython notebook(Jupyter notebook) 设置密码
    IPython notebook(Jupyter notebook)指定IP和端口运行
    ubuntu系统下 vscode中如何指定conda环境
    《Bitcoin: A Peer-to-Peer Electronic Cash System》 中本聪写的比特币白皮书
    QT-vs各个版本的编译器号对应的vs版本号
    惯性导航的组成
    论文引用格式
    SCI正刊和特刊(专刊/增刊)的区别是什么?
    python glob.glob() 函数
  • 原文地址:https://www.cnblogs.com/jnba/p/10636683.html
Copyright © 2011-2022 走看看