zoukankan      html  css  js  c++  java
  • 马士兵老师高并发编程之6大线程池

    Executor

    执行器,这是一个接口,内部维护了一个方法execute它负责执行一项任务。参数为Runnable,方法的具体实现由我们自己来执行。如下面的代码,我们既可以使用单纯的方法调用也可以新启一个新的线程去执行Runnable的run方法。

    1. import java.util.concurrent.Executor;
    2. public class T01_MyExecutor implements Executor {
    3. public static void main(String[] args) {
    4. new T01_MyExecutor().execute(()->System.out.println("hello executor"));
    5. }
    6. @Override
    7. public void execute(Runnable command) {
    8. //new Thread(command).run();
    9. command.run();
    10. }
    11. }

    ExecutorService

    代表着启动一系列的线程为用户提供服务(本质上也是一个执行器),比如说Java8的官方文档就举了一个网络接受连接池的例子(代码如下)。在这里ExecutorService就代表着一个的线程池对外提供接受网络请求的服务。同时它也是一系列线程池的接口比如说

    RorkJoinPool、ScheduledThreadPoolExecutor,、ThreadPoolExecutor等。同时它可以提交Callable与Runnable的对象返回一个未来的执行结果对象Future。这里顺便说一下,Callable是一个增强版的Runnable,它的call方法可以抛出异常可以有返回值。其中它的返回值放在了Future对象中,我们可以使用Future对象的get方法来获得返回值。

    1. class NetworkService implements Runnable {
    2. private final ServerSocket serverSocket;
    3. private final ExecutorService pool;
    4. public NetworkService(int port, int poolSize)
    5. throws IOException {
    6. serverSocket = new ServerSocket(port);
    7. pool = Executors.newFixedThreadPool(poolSize);
    8. }
    9. public void run() { // run the service
    10. try {
    11. for (;;) {
    12. pool.execute(new Handler(serverSocket.accept()));
    13. }
    14. } catch (IOException ex) {
    15. pool.shutdown();
    16. }
    17. }
    18. }
    19. class Handler implements Runnable {
    20. private final Socket socket;
    21. Handler(Socket socket) { this.socket = socket; }
    22. public void run() {
    23. // read and service request on socket
    24. }
    25. }

    除了以上方法来创建一个ExecutorService还可以使用Executors这个工具类来创建它,在这里我们可以把Executors理解为就像utils,collections的工具类。

    Future将来的结果

    Future常与Callable联合使用,Future可以获得Callable执行后的返回值。如果想新建一个线程执行一个这个Callable中的call方法而且获得返回值的话我们可以使用以下的思路。

    方案一:new Thread(new FutureTask(一个实现了Callable的类的对象)).start();使用FutureTask来接收任务的返回值。

    方案二:new一个线程池然后然后提交Callable的实现的对象。使用Future来获得Callable的返回值。具体实现如下:

    1. /**
    2. * 认识future
    3. */
    4. package yxxy.c_026;
    5. import java.util.concurrent.ExecutionException;
    6. import java.util.concurrent.ExecutorService;
    7. import java.util.concurrent.Executors;
    8. import java.util.concurrent.Future;
    9. import java.util.concurrent.FutureTask;
    10. import java.util.concurrent.TimeUnit;
    11. public class T06_Future {
    12. public static void main(String[] args) throws InterruptedException, ExecutionException {
    13. FutureTask<Integer> task = new FutureTask<>(()->{
    14. TimeUnit.MILLISECONDS.sleep(500);
    15. return 1000;
    16. }); //new Callable () { Integer call();}
    17. new Thread(task).start();
    18. System.out.println(task.get()); //阻塞
    19. //*******************************
    20. ExecutorService service = Executors.newFixedThreadPool(5);
    21. Future<Integer> f = service.submit(()->{
    22. TimeUnit.MILLISECONDS.sleep(500);
    23. return 1;
    24. });
    25. System.out.println(f.get());
    26. System.out.println(f.isDone());
    27. }
    28. }

    6大线程池的介绍

    FixedThreadPool

    一个固定大小的线程池运行以下程序得到相应的结果:

    1. public class T05_ThreadPool {
    2. public static void main(String[] args) throws InterruptedException {
    3. ExecutorService service = Executors.newFixedThreadPool(5); //execute submit
    4. for (int i = 0; i < 6; i++) {
    5. service.execute(() -> {
    6. try {
    7. TimeUnit.MILLISECONDS.sleep(500);
    8. } catch (InterruptedException e) {
    9. e.printStackTrace();
    10. }
    11. System.out.println(Thread.currentThread().getName());
    12. });
    13. }
    14. System.out.println(service);
    15. service.shutdown();
    16. System.out.println(service.isTerminated());
    17. System.out.println(service.isShutdown());
    18. System.out.println(service);
    19. TimeUnit.SECONDS.sleep(5);
    20. System.out.println(service.isTerminated());
    21. System.out.println(service.isShutdown());
    22. System.out.println(service);
    23. }
    24. }

    运行结果

    1. java.util.concurrent.ThreadPoolExecutor@1b28cdfa[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
    2. false
    3. true
    4. java.util.concurrent.ThreadPoolExecutor@1b28cdfa[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
    5. pool-1-thread-1
    6. pool-1-thread-2
    7. pool-1-thread-5
    8. pool-1-thread-4
    9. pool-1-thread-3
    10. pool-1-thread-1
    11. true
    12. true
    13. java.util.concurrent.ThreadPoolExecutor@1b28cdfa[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
    • 整个程序new了一个5个线程的线程池,使用for循环向这个线程池抛了5个任务。它的执行原则是哪一个线程空闲就由哪个线程来执行这个任务。所以我们看到的线程池的线程序号是不固定的乱序的,但是它有个规则就是先执行完任务的线程会在新线程到来时优先分配到任务。
    • 线程池shutdown之后程序不会立刻停止而是要等待的所有线程都执行完毕之后再停止服务,所以我们看到的就是Runningà Shutting downà Terminated
    • 线程池的任务大体上分为两类,等待就绪队列与已完成任务的队列。通过输出结果我们可以看出在开始有5个正在执行的任务1个任务驻留在就绪队列等待执行,在执行结束后我们的已执行队列中就会有6个元素。

    CachedThreadPool

    CachedPool的主要特点就是如果新来的一个任务需要这个线程池来执行的话,如果当前线程池没有闲置的线程那么就新启动一个线程,如果有空闲线程那么就使用其中的一个空闲线程。就是这样的一个有弹性的线程池。默认情况下当一个线程空闲超过60s那么就会销毁,而且线程数量最大不能超过int类型的最大值或者是计算机内存的大小。以下代码展示了这样的特性:

    1. package yxxy.c_026;
    2. import java.util.concurrent.ExecutorService;
    3. import java.util.concurrent.Executors;
    4. import java.util.concurrent.TimeUnit;
    5. public class T08_CachedPool {
    6. public static void main(String[] args) throws InterruptedException {
    7. ExecutorService service = Executors.newCachedThreadPool();
    8. System.out.println(service);
    9. for (int i = 0; i < 2; i++) {
    10. service.execute(() -> {
    11. try {
    12. TimeUnit.MILLISECONDS.sleep(500);
    13. } catch (InterruptedException e) {
    14. e.printStackTrace();
    15. }
    16. System.out.println(Thread.currentThread().getName());
    17. });
    18. }
    19. System.out.println(service);
    20. TimeUnit.SECONDS.sleep(80);
    21. System.out.println(service);
    22. }
    23. }

    SingleThreadPool

    这个线程池中只有一个线程,那么你可能回会问这与单个线程有什么区别呢?: - ) 原因就是它可以被复用!它的使用场景就是当我们需要保证任务执行的先后顺序的时候就可以使用它。

    ScheduledThreadPool

    一个定时执行任务的一个线程池它所执行的任务的参数如下:

    1. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
    2. long initialDelay,
    3. long period,
    4. TimeUnit unit)

    initialDelay:在开始多少单位时间的时候执行第一个任务。

    Period:每隔多长时间执行下一个任务。

    Unit:时间的单位。

    它的底层基于DelayedWorkQueue。

    以下代码展示了已启动就开始执行的而且步幅为0.5s的线程执行方式:

    1. public class T10_ScheduledPool {
    2. public static void main(String[] args) {
    3. ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
    4. service.scheduleAtFixedRate(()->{
    5. try {
    6. TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
    7. } catch (InterruptedException e) {
    8. e.printStackTrace();
    9. }
    10. System.out.println(Thread.currentThread().getName());
    11. }, 0, 500, TimeUnit.MILLISECONDS);
    12. }
    13. }

    工作窃取线程池,一般情况下CPU是几核的就会启动几个线程,每一个线程都维护者自己的一个执行队列的,当某些线程将自己队列中的任务都执行完毕的时候就会去其他线程的队列中窃取任务来执行以此提高效率。它的底层是基于ForkJoinPool的,常常用于任务分配不均匀的场景中。

    需要注意的是,这个线程池产生的都是daemon的线程(后台线程),所以我们需要将主线程阻塞来观察输出结果。

    1. public class T11_WorkStealingPool {
    2. public static void main(String[] args) throws IOException {
    3. ExecutorService service = Executors.newWorkStealingPool();
    4. System.out.println(Runtime.getRuntime().availableProcessors());
    5. service.execute(new R(1000));
    6. service.execute(new R(2000));
    7. service.execute(new R(2000));
    8. service.execute(new R(2000)); //daemon
    9. service.execute(new R(2000));
    10. //由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出
    11. System.in.read();
    12. }
    13. static class R implements Runnable {
    14. int time;
    15. R(int t) {
    16. this.time = t;
    17. }
    18. @Override
    19. public void run() {
    20. try {
    21. TimeUnit.MILLISECONDS.sleep(time);
    22. } catch (InterruptedException e) {
    23. e.printStackTrace();
    24. }
    25. System.out.println(time + " " + Thread.currentThread().getName());
    26. }
    27. }
    28. }

    ForkJoinPool

    这个线程池设计的思想就与MapReduce极其相似,将一个大的任务分解成一个个小的任务当多个线程来执行。然后将计算的结果汇总得到最终结果。这也是用到了递归的思想。其中它的任务分为两种一种没有返回值是RecursiveAction,一种有返回值RecursiveTask。常常用于大量数据的运算以下为示例代码:

    1. package yxxy.c_026;
    2. import java.io.IOException;
    3. import java.util.Arrays;
    4. import java.util.Random;
    5. import java.util.concurrent.ForkJoinPool;
    6. import java.util.concurrent.RecursiveTask;
    7. public class T12_ForkJoinPool {
    8. static int[] nums = new int[1000000];
    9. static final int MAX_NUM = 50000;
    10. static Random r = new Random();
    11. static {
    12. for(int i=0; i<nums.length; i++) {
    13. nums[i] = r.nextInt(100);
    14. }
    15. System.out.println(Arrays.stream(nums).sum()); //stream api
    16. }
    17. /*
    18. static class AddTask extends RecursiveAction {
    19. int start, end;
    20. AddTask(int s, int e) {
    21. start = s;
    22. end = e;
    23. }
    24. @Override
    25. protected void compute() {
    26. if(end-start <= MAX_NUM) {
    27. long sum = 0L;
    28. for(int i=start; i<end; i++) sum += nums[i];
    29. System.out.println("from:" + start + " to:" + end + " = " + sum);
    30. } else {
    31. int middle = start + (end-start)/2;
    32. AddTask subTask1 = new AddTask(start, middle);
    33. AddTask subTask2 = new AddTask(middle, end);
    34. subTask1.fork();
    35. subTask2.fork();
    36. }
    37. }
    38. }
    39. */
    40. static class AddTask extends RecursiveTask<Long> {
    41. int start, end;
    42. AddTask(int s, int e) {
    43. start = s;
    44. end = e;
    45. }
    46. @Override
    47. protected Long compute() {
    48. if(end-start <= MAX_NUM) {
    49. long sum = 0L;
    50. for(int i=start; i<end; i++) sum += nums[i];
    51. return sum;
    52. }
    53. int middle = start + (end-start)/2;
    54. AddTask subTask1 = new AddTask(start, middle);
    55. AddTask subTask2 = new AddTask(middle, end);
    56. subTask1.fork();
    57. subTask2.fork();
    58. return subTask1.join() + subTask2.join();
    59. }
    60. }
    61. public static void main(String[] args) throws IOException {
    62. ForkJoinPool fjp = new ForkJoinPool();
    63. AddTask task = new AddTask(0, nums.length);
    64. fjp.execute(task);
    65. long result = task.join();
    66. System.out.println(result);
    67. //System.in.read();
    68. }
    69. }

    事实上利用普通的线程池也可以完成大量数据的并行运算代码如下:

    1. /**
    2. * 线程池的概念
    3. * nasa
    4. */
    5. package yxxy.c_026;
    6. import java.util.ArrayList;
    7. import java.util.List;
    8. import java.util.concurrent.Callable;
    9. import java.util.concurrent.ExecutionException;
    10. import java.util.concurrent.ExecutorService;
    11. import java.util.concurrent.Executors;
    12. import java.util.concurrent.Future;
    13. public class T07_ParallelComputing {
    14. public static void main(String[] args) throws InterruptedException, ExecutionException {
    15. long start = System.currentTimeMillis();
    16. List<Integer> results = getPrime(1, 200000);
    17. long end = System.currentTimeMillis();
    18. System.out.println(end - start);
    19. final int cpuCoreNum = 4;
    20. ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);
    21. MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20
    22. MyTask t2 = new MyTask(80001, 130000);
    23. MyTask t3 = new MyTask(130001, 170000);
    24. MyTask t4 = new MyTask(170001, 200000);
    25. Future<List<Integer>> f1 = service.submit(t1);
    26. Future<List<Integer>> f2 = service.submit(t2);
    27. Future<List<Integer>> f3 = service.submit(t3);
    28. Future<List<Integer>> f4 = service.submit(t4);
    29. start = System.currentTimeMillis();
    30. f1.get();
    31. f2.get();
    32. f3.get();
    33. f4.get();
    34. end = System.currentTimeMillis();
    35. System.out.println(end - start);
    36. }
    37. static class MyTask implements Callable<List<Integer>> {
    38. int startPos, endPos;
    39. MyTask(int s, int e) {
    40. this.startPos = s;
    41. this.endPos = e;
    42. }
    43. @Override
    44. public List<Integer> call() throws Exception {
    45. List<Integer> r = getPrime(startPos, endPos);
    46. return r;
    47. }
    48. }
    49. static boolean isPrime(int num) {
    50. for(int i=2; i<=num/2; i++) {
    51. if(num % i == 0) return false;
    52. }
    53. return true;
    54. }
    55. static List<Integer> getPrime(int start, int end) {
    56. List<Integer> results = new ArrayList<>();
    57. for(int i=start; i<=end; i++) {
    58. if(isPrime(i)) results.add(i);
    59. }
    60. return results;
    61. }
    62. }

    这是一个质数计算的问题,我们把质数计算划分为不同的数据段是因为越大的质数越难计算,所以直观上计算大量的小数字的质数的时间相当于计算少量的大数字花的时间。这样一来我们就将这个大的任务相对均匀的拆分开来避免了任务分配不均匀造成的等待(也就是时间浪费)。

    线程池的底层实现

    前四种线程池的底层源码如下:

    ChchedThreadPool

     

    FixedThreadPool

    ScheduledPool

    SingleThreadPool

    我们会发现他们都是基于ThreadExecutor。

    而WorkStealingPool与ForkJoinPool的底层都是ForkJoinPool。

     

    最后感谢马士兵老师,一个专心做教育的老师。

  • 相关阅读:
    SoC FPGA开发板的FPGA配置数据下载和固化
    字体解码
    ProxyApi-大数据采集用的IP代理池
    mongodb-to-mongodb
    mongodb分片
    kubernetes部署kube-scheduler服务
    kubernetes部署kube-controller-manager服务
    kubernetes部署 kube-apiserver服务
    kubernetes部署haproxy、keepalived为kube-apiserver做集群
    kubernetes部署Docker私有仓库Registry
  • 原文地址:https://www.cnblogs.com/jpfss/p/9913351.html
Copyright © 2011-2022 走看看