zoukankan      html  css  js  c++  java
  • Java线程池如何优雅地等待所有任务执行完

     https://blog.csdn.net/flycp/article/details/106337294

    ***Java多线程-线程池ThreadPoolExecutor的submit返回值Future     (要看)

    https://blog.csdn.net/qq_25806863/article/details/71214033 

    随着项目的体量越来越大,对代码的执行效率要求越来越高,在实际应用过程中我们会经常使用线程池。
    那么如果线程池嵌入在业务代码中,如何正确的等待线程池执行完,在执行后续操作呢?或者想要获取执行结果有应该怎么处理呢?

    下面走一下场景:

    package com.example.demo1.entity;

    /**
    * create by c-pown on 2019-12-06
    */
    public class Student {
    private String name;
    private Integer age;
    private Integer heigh;
    private String hoby;

    public Student(String name, Integer age, Integer heigh, String hoby) {
    this.name = name;
    this.age = age;
    this.heigh = heigh;
    this.hoby = hoby;
    }
    static String getAllname(){
    return "张三";
    }
    public String getName() {
    return name;
    }

    public void setName(String name) {
    this.name = name;
    }

    public Integer getAge() {
    return age;
    }

    public void setAge(Integer age) {
    this.age = age;
    }

    public Integer getHeigh() {
    return heigh;
    }

    public void setHeigh(Integer heigh) {
    this.heigh = heigh;
    }

    public String getHoby() {
    return hoby;
    }

    public void setHoby(String hoby) {
    this.hoby = hoby;
    }

    @Override
    public String toString() {
    return "Student{" +
    "name='" + name + '\'' +
    ", age=" + age +
    ", heigh=" + heigh +
    ", hoby='" + hoby + '\'' +
    '}';
    }
    }
     
    package com.example.demo1.service.TestThreadPool;

    import com.example.demo1.entity.Student;

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;

    /**
    * create by c-pown on 2020-05-25
    */
    public class TestThreadPool {
    /**
    * 手动创建线程池
    */
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20,25,100L,
    TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadPoolExecutor.CallerRunsPolicy());

    public static void main(String[] args) {
    Student student = null;
    List<Student> students = new ArrayList<>();
    //添加五十万个学生元素
    for (int i = 0; i < 500000; i++) {
    student = new Student("name"+i,20,183,"玩");
    students.add(student);
    }
    for (Student student1 : students) {
    /**
    * 给元素添加后缀
    */
    executor.submit(()-> student1.setName(student1.getName()+"这是后缀"));
    }
    //查看添加情况
    System.out.println("添加数量:"+students.stream().filter(x->x.getName().contains("这是后缀")).count());
    }
    }
     
    我们给List里面添加500000个学生元素,然后使用线程池给name属性添加后缀,看一下执行结果:

    添加数量:475371

    我们发现添加成功的数量是少了两万多,这是由于线程池中的子线程任务没有执行完,而主线程已经开始执行业务代码,导致成功数量变少。
    下面我们修改一下代码:

    一、使用CountDownLatch
    package com.example.demo1.service.TestThreadPool;

    import com.example.demo1.entity.Student;

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;

    /**
    * create by c-pown on 2020-05-25
    */
    public class TestThreadPool {
    /**
    * 手动创建线程池
    */
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20,25,100L,
    TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadPoolExecutor.CallerRunsPolicy());

    public static void main(String[] args) {
    Student student = null;
    List<Student> students = new ArrayList<>();
    //添加五十万个学生元素
    for (int i = 0; i < 500000; i++) {
    student = new Student("name"+i,20,183,"玩");
    students.add(student);
    }
    CountDownLatch countDownLatch = new CountDownLatch(students.size());
    for (Student student1 : students) {
    /**
    * 给元素添加后缀
    */
    executor.submit(()-> {
    try {
    student1.setName(student1.getName()+"这是后缀");
    } catch (Exception e) {
    e.printStackTrace();
    }finally {
    countDownLatch.countDown();
    }
    });
    }
    try {
    countDownLatch.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    //查看添加情况
    System.out.println("添加数量:"+students.stream().filter(x->x.getName().contains("这是后缀")).count());
    }
    }
     
    结果:

    添加数量:500000
    1
    这是一个计数器操作,在线程池执行之前,给计数器指定数值(与要执行代码的次数一致)也就是students.size(),在线程池执行代码体里面要加上countDownLatch.countDown();代表每执行一次数值减少一,最后在循环体外边写上countDownLatch.await();代表等待计数器归零。
    可以查看下源码介绍:

    /**
    * Decrements the count of the latch, releasing all waiting threads if
    * the count reaches zero.
    *
    * <p>If the current count is greater than zero then it is decremented.
    * If the new count is zero then all waiting threads are re-enabled for
    * thread scheduling purposes.
    *
    * <p>If the current count equals zero then nothing happens.
    */
    public void countDown() {
    sync.releaseShared(1);
    }
     
    /**
    * Causes the current thread to wait until the latch has counted down to
    * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
    *
    * <p>If the current count is zero then this method returns immediately.
    *
    * <p>If the current count is greater than zero then the current
    * thread becomes disabled for thread scheduling purposes and lies
    * dormant until one of two things happen:
    * <ul>
    * <li>The count reaches zero due to invocations of the
    * {@link #countDown} method; or
    * <li>Some other thread {@linkplain Thread#interrupt interrupts}
    * the current thread.
    * </ul>
    *
    * <p>If the current thread:
    * <ul>
    * <li>has its interrupted status set on entry to this method; or
    * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
    * </ul>
    * then {@link InterruptedException} is thrown and the current thread's
    * interrupted status is cleared.
    *
    * @throws InterruptedException if the current thread is interrupted
    * while waiting
    */
    public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
    }
     
    介绍中写到等待计数器数量减少直至为0为止。也可以给await()设置超时时间

    countDownLatch.await(300,TimeUnit.SECONDS);
    1
    如果超过300s(也可以是时,分)则不再等待,直接执行下面代码。

    二、使用Future.get()
    package com.example.demo1.service.TestThreadPool;

    import com.example.demo1.entity.Student;

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;

    /**
    * create by c-pown on 2020-05-25
    */
    public class TestThreadPool {
    /**
    * 手动创建线程池
    */
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20,25,100L,
    TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadPoolExecutor.CallerRunsPolicy());

    public static void main(String[] args) {
    Student student = null;
    List<Student> students = new ArrayList<>();
    //添加五十万个学生元素
    for (int i = 0; i < 500000; i++) {
    student = new Student("name"+i,20,183,"玩");
    students.add(student);
    }
    List<Future> futures = new ArrayList<>();
    for (Student student1 : students) {
    /**
    * 给元素添加后缀
    */
    Future future = executor.submit(()-> {
    try {
    student1.setName(student1.getName()+"这是后缀");
    } catch (Exception e) {
    e.printStackTrace();
    }
    });
    futures.add(future);
    }
    for (Future future : futures) {
    try {
    future.get();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    }
    }
    //查看添加情况
    System.out.println("添加数量:"+students.stream().filter(x->x.getName().contains("这是后缀")).count());
    }
    }
     
    结果:

    添加数量:500000
    1
    Future.get()可以同步等待线程执行完成,并且可以监听执行结果

    /**
    * Waits if necessary for the computation to complete, and then
    * retrieves its result.
    *
    * @return the computed result
    * @throws CancellationException if the computation was cancelled
    * @throws ExecutionException if the computation threw an
    * exception
    * @throws InterruptedException if the current thread was interrupted
    * while waiting
    */
    V get() throws InterruptedException, ExecutionException;
     
    源码中可以看出方法是有返回值得,可以监听线程池子线程执行状态及执行结果。
    直接return 结果 Future<?>添加泛型即可。
    同样的 Future.get()也是可以指定超时时间的,超过等待时间可以直接执行后续代码。

    最后 如果线程池是方法内部创建的,可以直接使用shutdown()也会等待线程池的执行结果。同时会关闭线程池资源。

    executor.shutdown();
    try {
    executor.awaitTermination(300,TimeUnit.SECONDS);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    —————————————————————————————————————
    原文链接:https://blog.csdn.net/flycp/article/details/106337294

  • 相关阅读:
    线程安全和非线程安全
    spring MVC和hibernate的结合
    Spring学习笔记1——基础知识 (转)
    bitset && Luogu 3674 小清新人渣的本愿
    luogu P3452 [POI2007]BIU-Offices
    每日刷题记录
    Codeforces Round #721 (Div. 2) B2. Palindrome Game (hard version)
    2019湘潭邀请赛A
    2021CCPC浙江省赛 B
    Codeforces Round #720 (Div. 2) D
  • 原文地址:https://www.cnblogs.com/kelelipeng/p/15670646.html
Copyright © 2011-2022 走看看