zoukankan      html  css  js  c++  java
  • [转载] java多线程学习-java.util.concurrent详解(二)Semaphore/FutureTask/Exchanger

    转载自http://janeky.iteye.com/blog/770393

    ----------------------------------------------------------------------------- 

    3. Semaphore 
        我们先来学习一下JDK1.5 API中关于这个类的详细介绍: 
    “一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。” 

        我们一般用它来控制某个对象的线程访问对象 

        例如,对于某个容器,我们规定,最多只能容纳n个线程同时操作 
    使用信号量来模拟实现 

    具体代码如下(参考 [JCIP]) 

    Java代码  收藏代码
    1. import java.util.Collections;  
    2. import java.util.HashSet;  
    3. import java.util.Set;  
    4. import java.util.concurrent.ExecutorService;  
    5. import java.util.concurrent.Executors;  
    6. import java.util.concurrent.Semaphore;  
    7.   
    8. public class TestSemaphore {  
    9.   
    10.     public static void main(String[] args) {  
    11.         ExecutorService exec = Executors.newCachedThreadPool();  
    12.         TestSemaphore t = new TestSemaphore();  
    13.         final BoundedHashSet<String> set = t.getSet();  
    14.   
    15.         for (int i = 0; i < 3; i++) {//三个线程同时操作add  
    16.             exec.execute(new Runnable() {  
    17.                 public void run() {  
    18.                     try {  
    19.                         set.add(Thread.currentThread().getName());  
    20.                     } catch (InterruptedException e) {  
    21.                         e.printStackTrace();  
    22.                     }  
    23.                 }  
    24.             });  
    25.         }  
    26.   
    27.         for (int j = 0; j < 3; j++) {//三个线程同时操作remove  
    28.             exec.execute(new Runnable() {  
    29.                 public void run() {  
    30.                     set.remove(Thread.currentThread().getName());  
    31.                 }  
    32.             });  
    33.         }  
    34.         exec.shutdown();  
    35.     }  
    36.   
    37.     public BoundedHashSet<String> getSet() {  
    38.         return new BoundedHashSet<String>(2);//定义一个边界约束为2的线程  
    39.     }  
    40.   
    41.     class BoundedHashSet<T> {  
    42.         private final Set<T> set;  
    43.         private final Semaphore semaphore;  
    44.   
    45.         public BoundedHashSet(int bound) {  
    46.             this.set = Collections.synchronizedSet(new HashSet<T>());  
    47.             this.semaphore = new Semaphore(bound, true);  
    48.         }  
    49.   
    50.         public void add(T o) throws InterruptedException {  
    51.             semaphore.acquire();//信号量控制可访问的线程数目  
    52.             set.add(o);  
    53.             System.out.printf("add:%s%n",o);  
    54.         }  
    55.   
    56.         public void remove(T o) {  
    57.             if (set.remove(o))  
    58.                 semaphore.release();//释放掉信号量  
    59.             System.out.printf("remove:%s%n",o);  
    60.         }  
    61.     }  
    62. }  



        总结:Semaphore通常用于对象池的控制 

    4.FutureTask 
        我们先来学习一下JDK1.5 API中关于这个类的详细介绍: 

        “取消的异步计算。利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法,此类提供了对 Future 的基本实现。仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。 
    可使用 FutureTask 包装 Callable 或 Runnable 对象。因为 FutureTask 实现了 Runnable,所以可将 FutureTask 提交给 Executor 执行。 
    除了作为一个独立的类外,此类还提供了 protected 功能,这在创建自定义任务类时可能很有用。 “ 

        应用举例:我们的算法中有一个很耗时的操作,在编程的是,我们希望将它独立成一个模块,调用的时候当做它是立刻返回的,并且可以随时取消的 

    具体代码如下(参考 [JCIP]) 

    Java代码  收藏代码
    1. import java.util.concurrent.Callable;  
    2. import java.util.concurrent.ExecutionException;  
    3. import java.util.concurrent.ExecutorService;  
    4. import java.util.concurrent.Executors;  
    5. import java.util.concurrent.FutureTask;  
    6.   
    7. public class TestFutureTask {  
    8.   
    9.     public static void main(String[] args) {  
    10.         ExecutorService exec=Executors.newCachedThreadPool();  
    11.           
    12.         FutureTask<String> task=new FutureTask<String>(new Callable<String>(){//FutrueTask的构造参数是一个Callable接口  
    13.             @Override  
    14.             public String call() throws Exception {  
    15.                 return Thread.currentThread().getName();//这里可以是一个异步操作  
    16.             }});  
    17.               
    18.             try {  
    19.                 exec.execute(task);//FutureTask实际上也是一个线程  
    20.                 String result=task.get();//取得异步计算的结果,如果没有返回,就会一直阻塞等待  
    21.                 System.out.printf("get:%s%n",result);  
    22.             } catch (InterruptedException e) {  
    23.                 e.printStackTrace();  
    24.             } catch (ExecutionException e) {  
    25.                 e.printStackTrace();  
    26.             }  
    27.     }  
    28.   
    29. }  



        总结:FutureTask其实就是新建了一个线程单独执行,使得线程有一个返回值,方便程序的编写 

    5. Exchanger 
        我们先来学习一下JDK1.5 API中关于这个类的详细介绍: 
        “可以在pair中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。 “ 

        应用举例:有两个缓存区,两个线程分别向两个缓存区fill和take,当且仅当一个满了,两个缓存区交换 

        代码如下(参考了网上给的示例   http://hi.baidu.com/webidea/blog/item/2995e731e53ad5a55fdf0e7d.html) 

    Java代码  收藏代码
    1. import java.util.ArrayList;  
    2. import java.util.concurrent.Exchanger;  
    3.   
    4. public class TestExchanger {  
    5.   
    6.     public static void main(String[] args) {  
    7.         final Exchanger<ArrayList<Integer>> exchanger = new Exchanger<ArrayList<Integer>>();  
    8.         final ArrayList<Integer> buff1 = new ArrayList<Integer>(10);  
    9.         final ArrayList<Integer> buff2 = new ArrayList<Integer>(10);  
    10.   
    11.         new Thread(new Runnable() {  
    12.             @Override  
    13.             public void run() {  
    14.                 ArrayList<Integer> buff = buff1;  
    15.                 try {  
    16.                     while (true) {  
    17.                         if (buff.size() >= 10) {  
    18.                             buff = exchanger.exchange(buff);//开始跟另外一个线程交互数据  
    19.                             System.out.println("exchange buff1");  
    20.                             buff.clear();  
    21.                         }  
    22.                         buff.add((int)(Math.random()*100));  
    23.                         Thread.sleep((long)(Math.random()*1000));  
    24.                     }  
    25.                 } catch (InterruptedException e) {  
    26.                     e.printStackTrace();  
    27.                 }  
    28.             }  
    29.         }).start();  
    30.           
    31.         new Thread(new Runnable(){  
    32.             @Override  
    33.             public void run() {  
    34.                 ArrayList<Integer> buff=buff2;  
    35.                 while(true){  
    36.                     try {  
    37.                         for(Integer i:buff){  
    38.                             System.out.println(i);  
    39.                         }  
    40.                         Thread.sleep(1000);  
    41.                         buff=exchanger.exchange(buff);//开始跟另外一个线程交换数据  
    42.                         System.out.println("exchange buff2");  
    43.                     } catch (InterruptedException e) {  
    44.                         e.printStackTrace();  
    45.                     }  
    46.                 }  
    47.             }}).start();  
    48.     }  
    49. }  



        总结:Exchanger在特定的使用场景比较有用(两个伙伴线程之间的数据交互) 

  • 相关阅读:
    Docker安装以及运行第一个HelloWorld
    logstash-配置文件详解
    oh my zsh 常用插件
    Linux之Shell基本命令
    Linux的基本命令
    Vue
    rest_framwork之认证组件,权限组件,频率组件
    rest_framwork之序列化组件
    rest_framwork之APIView
    中间件
  • 原文地址:https://www.cnblogs.com/scott19820130/p/4614951.html
Copyright © 2011-2022 走看看