zoukankan      html  css  js  c++  java
  • Forkjoin线程池

      之前学习了ThreadPoolExecutor线程池的用法,现在学习下Forkjoin线程池的用法。主要也就是ForkJoinPool,该类和ThreadPoolExecutor是兄弟类,都继承自AbstractExecutorService。

    1.简介

    1.什么是Forkjoin以及适用场景  

      虽然目前处理器核心数已经发展到最大,但是按任务并发处理并不能完全充分的利用处理器资源,因为一般的应用程序没有那么多的并发任务。基于这种现象,考虑将一个任务拆分成多个单元,每个单元分别得到执行,最后合并每个单元的结果。

      Fork/join是Java7提供的一个用于并发执行任务的框架,是一个把多个大任务分成若干个小任务,最终汇总每个小任务结果得到大任务结果的框架。

       说白了就是充分利用多处理器的优势,让处理器充分利用起来,并发处理任务。

      如果需要查询CPU数量可以从任务管理器查看,也可以用java代码查看,如下:

    System.out.println(Runtime.getRuntime().availableProcessors());

    结果:

    4

    2.工作窃取算法

      一个大任务拆分成许多小任务,为了减小线程间的竞争,把这些子任务分别放到不同的队列中,并且每个队列都有单独的线程来执行任务,线程和队列一一对应。但是会出现一种情况:A线程处理完了自己队列的任务,但是B线程的队列还有很多任务要处理。如果A过去处理B队列的任务会访问同一个队列,造成竞争,解决办法就是A从双端队列的尾部拿任务、B从双端队列的头部拿任务。

     注意:每个线程都有自己的队列(这点和TjreadPoolExecutor不一样),当自己队列的任务完成以后会从其他线程的队列窃取一个任务执行,这样可以充分利用资源。

    优点:利用了线程并行计算,减少了线程间的竞争。

    缺点:

    (1)如果双端队列只有一个任务会产生竞争

    (2)窃取算法消耗了更多的系统资源,如会创建多个线程和多个双端队列。

    3.涉及的主要类

    1.ForkJoinTask

      它提供任务执行中fork和join操作的机制。一般只需要继承其子类:

    RecursiveTask:用于执行有返回结果的任务,这个泛型需要自己传。

    RecursiveAction:用于执行没有返回结果的任务,其实这个的泛型只不过是Void

    2.ForkJoinPool

      ForkJoinTask任务需要在ForkJoinPool中执行。默认的线程池数量等价于CPU数量(我的机子是4)。

    3.ForrkjoinWorkerThread

      ForkJoinPool中执行任务的线程。

    2.简单使用

    比如一个简单的使用,实现Forkjoin计算0-100000000000的和

    ForkJoinTask类:

      当需要计算的数值大于step的时候继续拆分任务,否则拆分任务。

    package forkjoin;
    
    import java.util.concurrent.RecursiveTask;
    
    /**
     * @author: 乔利强
     * @date: 2021/1/7 19:37
     * @description:
     */
    public class SumTask extends RecursiveTask<Long> {
    
        private long start, end;
    
        private long step = 1000000;
    
        public SumTask(long start, long end) {
            this.start = start;
            this.end = end;
        }
    
        @Override
        protected Long compute() {
            long sum = 0;
            if (end - start <= step) {
                for (long i = start; i <= end; i++)
                    sum += i;
            } else {
                long mid = start + (end - start) / 2;
                SumTask lt = new SumTask(start, mid);
                SumTask rt = new SumTask(mid + 1, end);
                lt.fork();
                rt.fork();
                long leftsum = lt.join();
                long rightsum = rt.join();
                sum = leftsum + rightsum;
            }
            
            return sum;
        }
    }

    测试类:测试单线程同步执行和Forkjoin框架执行

    package forkjoin;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    
    /**
     * @author: 乔利强
     * @date: 2021/1/7 19:32
     * @description:
     */
    public class SumClient {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 单线程测试
            long startNum = 0;
            long endNum = 100000000000L;
            long sum = 0;
            long startTime = System.currentTimeMillis();
            for (; startNum <= endNum; startNum++) {
                sum += startNum;
            }
            long endTime = System.currentTimeMillis();
            System.out.println("sum: " + sum + ", 用时: " + (endTime - startTime) + "ms");
    
            // fork-join测试
            startTime = System.currentTimeMillis();
            // 1. 创建线程池
            ForkJoinPool fp = new ForkJoinPool();
            // 2.创建一个任务
            SumTask task = new SumTask(0, endNum);
            // 3. 任务交给线程
            ForkJoinTask<Long> result = fp.submit(task);
            // 4.得到结果
            Long fkResult = result.get();
            endTime = System.currentTimeMillis();
            System.out.println("sum: " + fkResult + ", 用时: " + (endTime - startTime) + "ms");
        }
    }

    结果:(我在自己的电脑测的时候计算太耗时了就中断了。我在另一个电脑测的时候单线程大概是40多s、forkjoin大概是20多s)

    查看ForkJoinPool的默认构造,如下:可以创建的线程池的大小是CPU数量

        public ForkJoinPool() {
            this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
                 defaultForkJoinWorkerThreadFactory, null, false);
        }

    补充:并行流parallelStream底层也是用了ForkJoin,并行流适合没有线程安全问题、较单纯的数据处理任务

      parallelStream提供了流的并行处理,它是Stream的另一重要特性,其底层使用Fork/Join框架实现。简单理解就是多线程异步任务的一种实现。

    测试:

            List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
            numbers.parallelStream().forEach(num -> {
                try {
                    Thread.sleep(1 * 100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + ">>" + num);
            });

    结果: (可以看出打印的顺序是乱序证明确实是异步并行执行;打印的线程名字是ForkJoinPool.commonPool-worker-1到3 也证明是采用的ForkJoinPool)

    main>>6

    ForkJoinPool.commonPool-worker-1>>8

    ForkJoinPool.commonPool-worker-2>>3

    ForkJoinPool.commonPool-worker-3>>7

    main>>5

    ForkJoinPool.commonPool-worker-2>>4

    ForkJoinPool.commonPool-worker-1>>9

    ForkJoinPool.commonPool-worker-3>>2

    main>>1

    补充:可以通过修改VM参数-Djava.util.concurrent.ForkJoinPool.common.parallelism=N设置worker的数量,测试如下:

            System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "6");
    
            List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
            numbers.parallelStream().forEach(num -> {
                try {
                    Thread.sleep(1 * 100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + ">>" + num);
            });

    结果:

    main>>6

    ForkJoinPool.commonPool-worker-2>>3

    ForkJoinPool.commonPool-worker-1>>8

    ForkJoinPool.commonPool-worker-6>>5

    ForkJoinPool.commonPool-worker-3>>7

    ForkJoinPool.commonPool-worker-5>>9

    ForkJoinPool.commonPool-worker-4>>2

    main>>4

    ForkJoinPool.commonPool-worker-2>>1

    补充:ForkJoin不一定就肯定比单线程处理块,这还要看你单线程串行处理每个任务的耗时情况;ForkJoin本身创建线程以及Fork、Join任务都是需要花费时间的,比如: 

    package forkjoin;
    
    import java.util.Arrays;
    import java.util.List;
    
    public class SumClient2 {
    
        public static void main(String[] args) {
            // 修改forkjoin默认的线程池大小(可以修改为比处理器数量多,但是没必要,一般和处理器数目相同就可以)
            System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "16");
            // 模拟处理用时
            int handleTime = 2;
    
            List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16);
            long start = System.currentTimeMillis();
            numbers.parallelStream().forEach(num -> {
                try {
                    Thread.sleep(1 * handleTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + ">>" + num);
            });
            long end = System.currentTimeMillis();
            System.out.println((end - start) + "ms ===");
    
            start = System.currentTimeMillis();
            numbers.stream().forEach(num -> {
                try {
                    Thread.sleep(1 * handleTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + ">>" + num);
            });
            end = System.currentTimeMillis();
            System.out.println((end - start) + "ms ---");
        }
    }

    结果: 可以看出使用ForkJoinPool反而时间多于单线程,当 handleTime 增大的时候使用ForkJoinPool会快于单线程。

    main>>11
    ForkJoinPool.commonPool-worker-1>>1
    ForkJoinPool.commonPool-worker-8>>13
    ForkJoinPool.commonPool-worker-9>>6
    ForkJoinPool.commonPool-worker-4>>14
    ForkJoinPool.commonPool-worker-10>>5
    ForkJoinPool.commonPool-worker-6>>2
    ForkJoinPool.commonPool-worker-2>>15
    ForkJoinPool.commonPool-worker-13>>16
    ForkJoinPool.commonPool-worker-11>>3
    ForkJoinPool.commonPool-worker-15>>8
    ForkJoinPool.commonPool-worker-14>>7
    ForkJoinPool.commonPool-worker-3>>12
    ForkJoinPool.commonPool-worker-12>>4
    ForkJoinPool.commonPool-worker-5>>10
    ForkJoinPool.commonPool-worker-7>>9
    103ms ===
    main>>1
    main>>2
    main>>3
    main>>4
    main>>5
    main>>6
    main>>7
    main>>8
    main>>9
    main>>10
    main>>11
    main>>12
    main>>13
    main>>14
    main>>15
    main>>16
    49ms ---

  • 相关阅读:
    分别改动Cube每一个面的贴图UV(Unity3D开发之十八)
    解决MAC下PHP连接MYSQL错误Warning: mysql_connect(): No such file or directory in conn.php
    什么是IaaS,PaaS和SaaS及其区别
    /lib/dracut/hooks/shutdown/30-dm-shutdown.sh
    openstack instance bootmgr is missing 问题 修复
    openStack aio nova service-list neutron ext-list
    CentOS7 iso封装语句
    開始EEPlat之旅
    重构摘要4_构筑測试体系
    线程及其创建的方式
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/14248708.html
Copyright © 2011-2022 走看看