zoukankan      html  css  js  c++  java
  • java并发编程基础——线程池

    线程池

    由于启动一个线程要与操作系统交互,所以系统启动一个新的线程的成本是比较高的。在这种情况下,使用线程池可以很好的提升性能,特别是程序中涉及创建大量生命周期很短暂的线程时。

    与数据库连接池类似,线程池在启动时就创建了大量的空闲的线程,程序将一个Runnable对象或者Callable对象传给线程池,线程池就会启动一个线程来执行他们的run()或call()方法,当方法执行结束后,线程并不会死亡,而是再次返回线程池中成为空闲状态,等待下一次执行。

    线程池还可以有效的控制系统中并发线程的数量,当系统中包含大量并发线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃,而线程池中的最大线程参数可以控制系统中并发线程数不超过此数。

    一、Executors工厂生成线程池

    在java5开始,增加了一个Executors工厂类来生产线程池,它包含如下几个静态方法来生产线程池:

    newCacheThreadPool():创建一个具有缓存功能的线程池,系统根据需要创建线程,这些线程将会别缓存在线程池中。

    newFixedThreadPool(int nThread):创建一个可重用的、具有固定线程数的线程池。

    newSingleThreadExecutor():创建一个只有单线程的线程池,相当与newFixedThreadPool(1)。

    newScheduledThreadPool(int corePoolSize):创建具有指定线程数的线程池,它可以在延迟后执行线程任务。

    newSigleScheduleExecutor():创建只有一个线程的线程池,它可以在指定延迟后执行线程任务。

    上面方法中,前3个返回ExecutorService对象线程池。后面两个返回ScheduleExecutorService对象线程池。ScheduleExecutorService是ExecutorService的子类,可以延迟执行线程。

    线程池小例子:

    package threadtest;
     
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
     
    public class ThreadTest implements Runnable  {
     
        private static int i = 0;
        private synchronized void incre() {
            i++;
            System.out.println(Thread.currentThread().getName() +" "+ i);
        }
        @Override
        public void run() {
     
            for(int j=0;j<5;j++) {
                incre();
            }
        }
        public static void main(String[] args) {
     
            //生产一个线程池
            ExecutorService es = Executors.newFixedThreadPool(6);
            ThreadTest tt = new ThreadTest();
            es.submit(tt);
            es.submit(tt);
            es.submit(tt);
            es.shutdown();
        }
    }

    结果:

    pool-1-thread-1 1
    pool-1-thread-1 2
    pool-1-thread-1 3
    pool-1-thread-1 4
    pool-1-thread-1 5
    pool-1-thread-2 6
    pool-1-thread-2 7
    pool-1-thread-2 8
    pool-1-thread-2 9
    pool-1-thread-2 10
    pool-1-thread-3 11
    pool-1-thread-3 12
    pool-1-thread-3 13
    pool-1-thread-3 14
    pool-1-thread-3 15

    二、ForkJoinPool

    java7提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的计算结果合并成总的计算结果。(跟进多核cpu时代)

    ForkJoinPool是ExecutorService接口的实现类,所以它是一种特殊的线程池

    ForkJoinPool有两个常用的构造器:

    ForkJoinPool(int parallelism):创建包含parallelism个并行线程的ForkJoinPool

    ForkJoinPool():以Runtime.availableProcessors()方法的返回值做为parallelism参数来创建ForkJoinPool。

    ForkJoinPool创建后就可以调用它的submit(ForkJoinTask task)或invoke(ForkJoinTask task)方法来执行指定任务了。其中ForkJoinTask代表一个可以并行、合并的任务。

    ForkJoinTask是一个抽象类,它还有两个抽象子类:RecursiveAction和RecursiveTask。其中RecursiveTask代表有返回值的任务,RecursiveAction代表无返回值的任务。

    下面程序利用ForkJoinPool执行RecursiveAction任务,打印一段数字

    package threadtest;
     
    import java.util.concurrent.RecursiveAction;
     
    /**
     * 继承RecursiveAction实现可分解的任务
     * @author rdb
     *
     */
    public class PrintTask extends RecursiveAction{
     
        //每个任务对多打印50个数
        private static final int THRESHOLD = 50;
        private int start;
        private int end;
        public PrintTask(int start,int end) {
            this.start = start;
            this.end = end ;
        }
        @Override
        protected void compute() {
            //打印数小于50开始打印,否则分解任务
            if(end - start < THRESHOLD) {
                for(int i = start;i<end;i++) {
                    System.out.println(Thread.currentThread().getName()+" "+ i);
                }
            }else {
                int mind = (end + start)/2;
                PrintTask left = new PrintTask(start, mind);
                PrintTask right = new PrintTask(mind, end);
                //分解任务
                left.fork();
                right.fork();
                 
            }
        }
    }
     
    package threadtest;
     
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.TimeUnit;
     
    public class ThreadTest {
     
    public static void main(String[] args) throws InterruptedException {
             
            //构建ForkJoinPool
            ForkJoinPool pool = new ForkJoinPool();
            //提交任务
            pool.submit(new PrintTask(0, 300));
            pool.shutdown();
             
            //阻塞主线程,直到线程池关闭
            while(!pool.awaitTermination(2, TimeUnit.SECONDS)) {
                System.out.println("service not stop");
            }
            System.out.println("all thread complete");
        }
     
    }

    结果:从结果中可以看出,有个四个核在并行执行任务(电脑是4核处理器)

    ForkJoinPool-1-worker-1 262
    ForkJoinPool-1-worker-0 37
    ForkJoinPool-1-worker-3 187
    ForkJoinPool-1-worker-3 188
    ForkJoinPool-1-worker-3 189
    ForkJoinPool-1-worker-3 190
    ForkJoinPool-1-worker-3 191
    ForkJoinPool-1-worker-2 112
    ForkJoinPool-1-worker-2 113
    ForkJoinPool-1-worker-3 192
    ...
    ForkJoinPool-1-worker-0 259
    ForkJoinPool-1-worker-0 260
    ForkJoinPool-1-worker-1 296
    ForkJoinPool-1-worker-0 261
    ForkJoinPool-1-worker-1 297
    ForkJoinPool-1-worker-1 298
    ForkJoinPool-1-worker-1 299
    all thread complete

    下面程序利用ForkJoinPool执行RecursiveTask任务,求数组的和并返回

    package threadtest;
     
    import java.util.concurrent.RecursiveTask;
    /**
     * 继承RecursiveTask实现可分解的有返回值的任务
     * @author rdb
     *
     */
    public class SumTask extends RecursiveTask<Integer>{
     
        //定义每个任务最多求和10个元素
            private final int THRESHOLD = 10;
            private int[] array;
            private int start;
            private int end;
            public SumTask(int[] array,int start,int end) {
                this.array = array;
                this.start = start;
                this.end = end;
            }
            @Override
            protected Integer compute() {
                 int sum = 0;
                if((end - start) < THRESHOLD) {
                    for(int i = start;i<end;i++) {
                        sum += array[i];
                    }
                    return sum;
                }else {
                    int mind = (end + start)/2 ;
                    SumTask left = new SumTask(array, start, mind);
                    SumTask right = new SumTask(array, mind, end);
                    left.fork();
                    right.fork();
                    return left.join()+right.join();
                }
            }
     
    }
     
    package threadtest;
      
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.Future;
      
    public class ThreadTest {
      
        public static void main(String[] args) throws Exception {
            int[] array = new int[100] ;
            int total = 0;
             
            for(int i = 0;i<array.length;i++) {
                array[i] = (int) (Math.random() * 10);
                total += array[i];
            }
            System.out.println(total);
            //JAVA8新加的通用池
            ForkJoinPool pool = ForkJoinPool.commonPool();
            Future<Integer> future = pool.submit(new SumTask(array, 0, array.length));
            System.out.println(future.get());
        }     
    }

    结果

    437
    437
  • 相关阅读:
    算法竞赛入门经典习题2-3 韩信点兵
    ios入门之c语言篇——基本函数——5——素数判断
    ios入门之c语言篇——基本函数——4——数值交换函数
    144. Binary Tree Preorder Traversal
    143. Reorder List
    142. Linked List Cycle II
    139. Word Break
    138. Copy List with Random Pointer
    137. Single Number II
    135. Candy
  • 原文地址:https://www.cnblogs.com/jnba/p/10636683.html
Copyright © 2011-2022 走看看