zoukankan      html  css  js  c++  java
  • 【重要】4线程汇总-线程间通信方式

    1  callable和future


    http://blog.csdn.net/zy_281870667/article/details/72047325

    一般情况,我们实现多线程都是Thread或者Runnable(后者比较多),但是,这两种都是没返回值的,所以我们需要使用callable(有返回值的多线程)和future(获得线程的返回值)来实现了。

     

    public class TestThread {  
        public static void main(String[] args) {  
            ThreadCount tc = null;  
            ExecutorService es = Executors.newCachedThreadPool();//线程池  
            CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(es);  
            for(int i=0;i<4;i++){  
                tc = new ThreadCount(i+1);  
                cs.submit(tc);  
            }  
              
            // 添加结束,及时shutdown,不然主线程不会结束  
            es.shutdown();  
              
            int total = 0;  
            for(int i=0;i<4;i++){  
                try {  
                    total+=cs.take().get();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                } catch (ExecutionException e) {  
                    e.printStackTrace();  
                }  
            }  
              
            System.out.println(total);  
        }  
    }  
      
    class ThreadCount implements Callable<Integer>{  
        private int type;  
        ThreadCount(int type){  
            this.type = type;  
        }  
        @Override  
        public Integer call() throws Exception {  
            if(type==1){  
                System.out.println("C盘统计大小");  
                return 1;  
            }else if(type==2){  
                Thread.sleep(20000);  
                System.out.println("D盘统计大小");  
                return 2;  
            }else if(type==3){  
                System.out.println("E盘统计大小");  
                return 3;  
            }else if(type==4){  
                System.out.println("F盘统计大小");  
                return 4;  
            }  
            return null;  
        }  
    }  


     

    ps:一个需要注意的小细节,cs.take.get()获取返回值(这里阻塞?),是按照完成的顺序的,即上面案例返回顺序是CEFD

    tips:

    ExecutorCompletionService 是将 Executor和BlockQueue结合的jdk类,其实现的主要目的是:提交任务线程,每一个线程任务直线完成后,将返回值放在阻塞队列中,然后可以通过阻塞队列的take()方法返回 对应线程的执行结果!!


     

    2. join阻塞——直接用join把线程5加入进去即可

    http://blog.csdn.net/u011971132/article/details/62444282

    直接用join把线程5加入进去即可

    public static void main(String[] args) throws InterruptedException
        {
            Thread t1 = new Thread(new Worker("thread-1"));
            Thread t2 = new Thread(new Worker("thread-2"));
            Thread t3 = new Thread(new Worker("thread-3"));
            Thread t4 = new Thread(new Worker("thread-4"));
            Thread t5 = new Thread(new Worker("thread-5"));
    
            t1.start();t2.start();t3.start();t4.start();
            t1.join();t2.join();t3.join();t4.join();
    
            t5.start();
            t5.join();
        }

    或者,在t5的run:

    t1.start();t2.start();t3.start();t4.start(); t1.join();t2.join();t3.join();t4.join();

    t5线程中等待四个线程返回




    3.用java.util.concurrent下的方法解决

    http://blog.csdn.net/wenwen360360/article/details/62104612


    CountDownLatch : 一个线程(或者多个), 等待另外N个线程完成某个事情之后才能执行

    CountDownLatch 是计数器, 线程完成一个就记一个, 就像 报数一样, 只不过是递减的.

    盗用别人的一个例子:

     

    [html] view plain copy
     
    1. public class CountDownLatchDemo {    
    2.     final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");    
    3.     public static void main(String[] args) throws InterruptedException {    
    4.         CountDownLatch latch=new CountDownLatch(2);//两个工人的协作    
    5.         Worker worker1=new Worker("zhang san", 5000, latch);    
    6.         Worker worker2=new Worker("li si", 8000, latch);    
    7.         worker1.start();//    
    8.         worker2.start();//    
    9.         latch.await();//等待所有工人完成工作    
    10.         System.out.println("all work done at "+sdf.format(new Date()));    
    11.     }    
    12.         
    13.         
    14.     static class Worker extends Thread{    
    15.         String workerName;     
    16.         int workTime;    
    17.         CountDownLatch latch;    
    18.         public Worker(String workerName ,int workTime ,CountDownLatch latch){    
    19.              this.workerName=workerName;    
    20.              this.workTime=workTime;    
    21.              this.latch=latch;    
    22.         }    
    23.         public void run(){    
    24.             System.out.println("Worker "+workerName+" do work begin at "+sdf.format(new Date()));    
    25.             doWork();//工作了    
    26.             System.out.println("Worker "+workerName+" do work complete at "+sdf.format(new Date()));    
    27.             latch.countDown();//工人完成工作,计数器减一    
    28.     
    29.         }    
    30.             
    31.         private void doWork(){    
    32.             try {    
    33.                 Thread.sleep(workTime);    
    34.             } catch (InterruptedException e) {    
    35.                 e.printStackTrace();    
    36.             }    
    37.         }    
    38.     }    
    39.         
    40.          
    41. }   


    CyclicBarrier        : N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。
    这样应该就清楚一点了,对于CountDownLatch来说,重点是那个“一个线程”, 是它在等待, 而另外那N的线程在把“某个事情”做完之后可以继续等待,可以终止。而对于CyclicBarrier来说,重点是那N个线程,他们之间任何一个没有完成,所有的线程都必须等待。

     

    CyclicBarrier更像一个水闸, 线程执行就想水流, 在水闸处都会堵住, 等到水满(线程到齐)了, 才开始泄流.

     

    http://blog.csdn.net/gongpulin/article/details/51236398

     


    [java]
     view plain copy
     
     
    1. import java.util.concurrent.BrokenBarrierException;     
    2. import java.util.concurrent.CyclicBarrier;      
    3. public class CyclicBarrierTest {     
    4.         public static void main(String[] args) {     
    5.                 //创建CyclicBarrier对象,    
    6.                 //并设置执行完一组5个线程的并发任务后,再执行MainTask任务    
    7.                 CyclicBarrier cb = new CyclicBarrier(5new MainTask());     
    8.                 new SubTask("A", cb).start();     
    9.                 new SubTask("B", cb).start();     
    10.                 new SubTask("C", cb).start();     
    11.                 new SubTask("D", cb).start();     
    12.                 new SubTask("E", cb).start();    
    13.         }     
    14. }     
    15.     
    16. /**   
    17. * 最后执行的任务  
    18. */     
    19. class MainTask implements Runnable {     
    20.         public void run() {     
    21. //根据jdkdoc里的描述,哪个线程最后运行完,就执行下面的代码。
    22.                 System.out.println("......执行最后的任务了......");     
    23.         }     
    24. }     
    25.     
    26. /**   
    27. * 一组并发任务   
    28. */     
    29. class SubTask extends Thread {     
    30.         private String name;     
    31.         private CyclicBarrier cb;     
    32.     
    33.         SubTask(String name, CyclicBarrier cb) {     
    34.                 this.name = name;     
    35.                 this.cb = cb;     
    36.         }     
    37.     
    38.         public void run() {     
    39.                 System.out.println("[并发任务" + name + "]  开始执行");     
    40.                 for (int i = 0; i < 999999; i++) ;    //模拟耗时的任务     
    41.                 System.out.println("[并发任务" + name + "]  开始执行完毕,通知障碍器");     
    42.                 try {     
    43.                         //每执行完一项任务就通知障碍器     
    44.                         cb.await();     
    45.                 } catch (InterruptedException e) {     
    46.                         e.printStackTrace();     
    47.                 } catch (BrokenBarrierException e) {     
    48.                         e.printStackTrace();     
    49.                 }     
    50.         }     
    51. }   
    结果:

     

     

    [并发任务A]  开始执行
    [并发任务B]  开始执行
    [并发任务B]  开始执行完毕,通知障碍器
    [并发任务C]  开始执行
    [并发任务C]  开始执行完毕,通知障碍器
    [并发任务A]  开始执行完毕,通知障碍器
    [并发任务D]  开始执行
    [并发任务D]  开始执行完毕,通知障碍器
    [并发任务E]  开始执行
    [并发任务E]  开始执行完毕,通知障碍器
    ......执行最后的任务了......





    4. ExecutorService类提供其他版本的invokeAll()阻塞——这还是一个更详细future的例子,其机制类似于join,这里拿出来


    线程执行者(六)运行多个任务并处理所有结果

    http://ifeve.com/thread-executors-6/


    执行者框架允许你在不用担心线程创建和执行的情况下,并发的执行任务。它还提供了Future类,这个类可以用来控制任务的状态,也可以用来获得执行者执行任务的结果。

    如果你想要等待一个任务完成,你可以使用以下两种方法:

    • 如果任务执行完成,Future接口的isDone()方法将返回true。——注意,是立即返回,不会阻塞等待
    • ThreadPoolExecutor类的awaitTermination()方法使线程进入睡眠,直到每一个任务调用shutdown()方法之后完成执行。

    这两种方法都有一些缺点。第一个方法,你只能控制一个任务的完成。第二个方法,你必须等待一个线程来关闭执行者,否则这个方法的调用立即返回。

    ThreadPoolExecutor类提供一个方法,允许你提交任务列表给执行者,并且在这个列表上等待所有任务的完成。在这个指南中,你将学习如何使用这个特性,实现一个示例,执行3个任务,并且当它们完成时将结果打印出来。

    准备工作…

    这个指南的例子使用Eclipse IDE实现。如果你使用Eclipse或其他IDE,如NetBeans,打开它并创建一个新的Java项目。

    如何做…

    按以下步骤来实现的这个例子:

    1.创建Result类,存储这个示例中并发任务产生的结果。

    1 public class Result {

    2.声明两个私有属性。一个String属性,名为name,另一个int属性,名为value。

    1 private String name;
    2 private int value;

    3.实现相应的get()和set()方法,用来设置和获取name和value属性的值。

    01 public String getName() {
    02 return name;
    03 }
    04 public void setName(String name) {
    05 this.name = name;
    06 }
    07 public int getValue() {
    08  
    09 return value;
    10 }
    11 public void setValue(int value) {
    12 this.value = value;
    13 }

    4.创建Task类,实现Callable接口,参数化为Result类型。

    1 public class Task implements Callable<Result> {

    5.声明一个私有String属性,名为name。

    1 private String name;

    6.实现Task类构造器,初始化这个属性。

    1 public Task(String name) {
    2 this.name=name;
    3 }

    7.实现这个类的call()方法,在本例中,它将返回一个Result对象。

    1 @Override
    2 public Result call() throws Exception {

    8.首先,写入一个信息到控制台,表明任务开始。

    1 System.out.printf("%s: Staring ",this.name);

    9.然后,等待一个随机时间。

    1 try {
    2 long duration=(long)(Math.random()*10);
    3 System.out.printf("%s: Waiting %d seconds for results. ",this.name,duration);
    4 TimeUnit.SECONDS.sleep(duration);
    5 catch (InterruptedException e) {
    6 e.printStackTrace();
    7 }

    10.在Result对象中返回一个计算5个随机数的总和的int值。

    1 int value=0;
    2 for (int i=0; i<5; i++){
    3 value+=(int)(Math.random()*100);
    4 }

    11.创建Result对象,用任务的名称和前面操作结果来初始化它。

    1 Result result=new Result();
    2 result.setName(this.name);
    3 result.setValue(value);

    12.写入一个信息到控制台,表明任务已经完成。

    1 System.out.println(this.name+": Ends");

    13.返回Result对象。

    1 return result;
    2 }

    14.最后,实现这个示例的主类,创建Main类,实现main()方法。

    1 public class Main {
    2 public static void main(String[] args) {

    15.使用Executors类的newCachedThreadPool()方法,创建ThreadPoolExecutor对象。

    1 ExecutorService executor=(ExecutorService)Executors.newCachedThreadPool();

    16.创建Task对象列表。创建3个Task对象并且用这个列表来存储。

    1 List<Task> taskList=new ArrayList<>();
    2 for (int i=0; i<3; i++){
    3 Task task=new Task(i);
    4 taskList.add(task);
    5 }

    17.创建Future对象列表,参数化为Result类型。

    1 List<Future<Result>>resultList=null;

    18.调用ThreadPoolExecutor类的invokeAll()方法。这个类将会返回之前创建的Future对象列表。

    1 try {
    2 resultList=executor.invokeAll(taskList);
    3 catch (InterruptedException e) {
    4 e.printStackTrace();
    5 }

    19.使用shutdown()方法结束执行者。

    1 executor.shutdown();

    20.写入处理Future对象列表任务的结果。

    01 System.out.println("Main: Printing the results");
    02 for (int i=0; i<resultList.size(); i++){
    03 Future<Result> future=resultList.get(i);
    04 try {
    05 Result result=future.get();
    06 System.out.println(result.getName()+": "+result.
    07 getValue());
    08 catch (InterruptedException | ExecutionException e) {
    09 e.printStackTrace();
    10 }
    11 }

    它是如何工作的…

    在这个指南中,你已经学习了如何提交任务列表到执行者和使用invokeAll()方法等待它们的完成。这个方法接收Callable对象列表和返回 Future对象列表。这个列表将会有列表中每个任务的一个Future对象。Future对象列表的第一个对象是Callable对象列表控制的第一个任务,以此类推。

    第一点要考虑的是,在存储结果对象的列表中声明的Future接口参数化的数据类型必须与使用的Callable对象的参数化相兼容。在本例中,你已经使用相同数据类型:Result类型。

    另一个重要的一点就是关于invokeAll()方法,你会使用Future对象获取任务的结果。当所有的任务完成时,这个方法结束,如果你调用返回的Future对象的isDone()方法,所有调用都将返回true值。

    不止这些…

    ExecutorService类提供其他版本的invokeAll()方法:

    • invokeAll(Collection<? extends Callable<T>> tasks, long timeout,TimeUnit unit):此方法执行所有任务,当它们全部完成且未超时,返回它们的执行结果。TimeUnit类是个枚举类,有如下常量:DAYS,HOURS,MICROSECONDS, MILLISECONDS, MINUTES,,NANOSECONDS 和SECONDS。



    个人认为,此处用invoke方法不好,如同join一样,会阻塞线程发起方,主线程在等待返回的时候啥都做不了,但是意思是这个意思,符合题目


    小结:
    1.future模式
    2.join阻塞
    3.ExecutorService.invokeAll阻塞
    4.countDownLatch
    5.SyclicBarrier
    6.回调-观察者模式:https://www.cnblogs.com/silyvin/p/9106633.html
    7.锁 Synchronized ReetraintLock,本质与join同


  • 相关阅读:
    数据库数据格式化之Kettle Spoon
    NopCommerce开源项目中很基础但是很实用的C# Helper方法
    oracle 两个逗号分割的字符串 如何判断是否其中有相同值
    MongoDB+MongoVUE安装及入门
    C#中Dictionary<TKey,TValue>排序方式
    kettle的基本介绍
    Kettle能做什么?
    oracle like 条件拼接
    loading加载和layer.js
    关于bootstrap的treeview不显示多选(复选框)的问题,以及联动选择的问题,外加多选后取值
  • 原文地址:https://www.cnblogs.com/silyvin/p/9106682.html
Copyright © 2011-2022 走看看