zoukankan      html  css  js  c++  java
  • Java中的并发工具类

    CountDownLatch  

      CountDownLatch允许一个或多个线程等待其他线程完成操作

      当需要解析一个Excel里面有多个sheet数据时,可以使用多线程,每个线程解析一个sheet里的数据。主线程等待所有线程执行完sheet的解析操作。

      

    public class JoinCountDownLatchTest(){
    
      public static void main(String[] args){
        Thread parser1 = new Thread(new Runnable)_{
          public void run(){
          }
        });
        Thread parser2 = new Thread(new Runnable)_{
          public void run(){
          }
        });
        parser1.start();
        parser2.start();
        parser1.join();
        parser1.join();
        System.out.print("all parsers finished");
      }
    
    }

      join用于让当前执行线程等待join线程执行结束。其原理是不停检查join线程是否存活,若join线程存活则让当前线程永远等待。直到join()线程中之后,线程的this.notifyAll()方法会被调用,调用notifyAll()方法是JVM里实现的。

    public class CountDownLatchTest{
      static CountDownLatch c = new CountDownLatch(2);
      public static void main(String[] args) throws InterruptedException{
        new Thread(new Runnable(){
          System.out.println(1);
          c.countDown();
          System.out.println(2);
          c.countDown();
        }).start();
        c.wait();
        System.out.println("3");
      }
    }

      CountDownLatch的构造函数接受一个int类型的参数作为计数器,若想等待N个点完成,就传入N。当我们调用CountDownLatch的countDown方法时,N就减1,CountDownLatch的await方法会阻塞当前线程,直到N变成0。由于countDown方法可以用在任何定法,即可以使N个线程。只需要把这个CountDownLatch的引用传递到线程里即可。若某个线程处理的比较慢,可以使用await的重载方法await(long time, TimeUnit unit)。

      计数器必须大于0,当计数器等于0时,调用await()方法不会阻塞当前线程。CountDownLatch不可能重新初始化或修改CountDownLatch对象的内部计数器的值。一个线程调用countDown方法happen-before另外一个线程调用await方法。

    同步屏障CyclicBarrier

      CyclicBarrier的意思是可循环使用(Cyclic)的屏障(Barrier)。让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续执行。

      CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await()方法高速CyclicBarrier我已经到达了屏障,然后当前贤臣被阻塞。

    public class CyclicBarrireTest{
      static CyclicBarrier c = new CyclicBarrier(2);
      public static void main(String[] args){
        new Thread(new Runnable(){
          public void run(){
            try{
              c.await();
            }catch(Exception e){
            }
            System.out.println(1);
          }
        }).start();
        try{
          c.await();
        }catch(Exception e){
          System.out.println(2);
        }
      }
    }

      若把new CyclicBarrier(2)修改成new CyclicBarrire(3),则主线程和子线程会永远等待,因为没有第三个线程执行await方法,即没有第三个线程达到屏障,所以之前到达屏障的两个线程都不会继续执行。

      CyclicBarrier还提供给了构造函数CyclicBarrier(int parties, Runnable barrierAction)用于在线程到达屏障时先执行barrierAction。

    public class CyclicBarrierTest2{
      static CyclicBarrier c = new CyclicBarrier(2, new A());
      public static void main(String[] args){
        new Thread(new Runnable(){
          public void run(){
            try{
              c.await(); 
            }catch(Exception e){}
            System.out.println(1);
          }
        }).start();
        try{ 
          c.await();
        }catch(Exception e){}
        System.out.println(2);
      }
    
      static class A implements Runnable{
        public void run(){
          System.out.println(3);
        }
      }
    }

      CyclicBarrier可用于多线程计算数据最后合并计算结果的场景。

    public class BankWaterService implements Runnable{
      private CyclicBarrier c = new CyclicBarrier(4, this);
      private Executor executor = Executors.newFixedThreadPool(4, this);
      private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<String, Integer>();
      
      private void count(){
        for(int i = 0; i < 4; i++){
          executor.execute(new Runnable(){
            public void run(){
              sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
              try{
               c.await(); 
              }catch(InterruptedException | BrokenBarrierException e){
                e.printStackTrace();
              }
            }
          });
        }
      }
    
      public void run(){
        int result = 0;
        for(Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()){
          result += sheet.getValue();
        }
        sheetBankWaterCount.put("result". result);
        System.out.println(result);
      }
    
      public static void main(String[] args){
        BankWaterService bankWaterCount = new BankWaterService();
        bankWaterCount.count();
      }
      
    }
    public class BankWaterService implements Runnable{
      private CyclicBarrier c = new CyclicBarrier(4, this);
      private Executor executor = Executors.newFixedThreadPool(4, this);
      private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<String, Integer>();
      
      private void count(){
        for(int i = 0; i < 4; i++){
          executor.execute(new Runnable(){
            public void run(){
              sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
              try{
               c.await(); 
              }catch(InterruptedException | BrokenBarrierException e){
                e.printStackTrace();
              }
            }
          });
        }
      }
    
      public void run(){
        int result = 0;
        for(Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()){
          result += sheet.getValue();
        }
        sheetBankWaterCount.put("result". result);
        System.out.println(result);
      }
    
      public static void main(String[] args){
        BankWaterService bankWaterCount = new BankWaterService();
        bankWaterCount.count();
      }
      
    }

    CyclicBarrier和CountDownLatch的区别

      CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。CyclicBarrier还提供了getNumberWaiting方法获取CyclicBarrier阻塞的线程数量。isBroker()方法用来了解阻塞的线程是否被中断。

    Semaphore

      Semaphore是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

      Semaphore可以用于做流量控制,特别是公共资源有限的应用场景,如连接数据库。

    public class SemaphoreTest(){
      private static final int THREAD_COUNT = 30;
      private static final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
      private static Semaphore s = new Semaphore(10);
      public static void main(String[] args){
        for(int i = 0; i < THREAD_COUNT; i++){
          threadPool.executor(new Runnable(){
            try{
             s.acquire();
             System.out.println("save data");
             s.release();
            }catch(InterruptedException e){}
          });
        }
        threadPool.shutdown();
      }
    }

      Semaphore提供如下方法:

        acquire():获取一个许可证

        release():归还许可证

        int avaliablePermits():返回信号量中当前可用的许可证书

        int getQueueLength():返回正在等待获取许可证的线程书

                  boolean hasQueuedThreads():是否有线程正在等待获取许可证

        void reducePermits(int reduction):减少reduction个许可证,protected

        Collection getQueuedThreads():返回所有等待获取许可证的线程集合,protected

    Exchanger

      Exchanger是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,若第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange()方法,当两个线程都到达同步点时,这两个线程可以交换数据,将本线程生产出的数据传递给对方。

      Exchange可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候交换两个人数据,并使用交叉规则得出2个交配的结果。同时Exchanger也可以用于校对工作。为了避免错误,采用AB岗两人进行录入数据到Excel后,系统加载两个Excel并对两个Excel数据进行校对。

    public class ExchangerTest{
      private static final Exchanger<String> exgr = new Exchanger<String>();
      private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
      
      public static void main(String[] args){
        threadPool.execute(new Runnble(){
          public void run(){
            try{
              String A = "banker A";
              exgr.exchange(A);
            }catch(InterruptedException){}
          }
        });
        threadPool.execute(new Runnable(){
          public void run(){
            String B = "banker B";
            String A = exgr.exchange(B);
            System.out.println("A equals to B : " + A.equals(B));
          }
        });
      threadPool.shutdown(); 
      }
    }

      若两个线程有一个没有执行exchange()方法,则会一直等待。此时为避免一直等候,可以调用exchange(V x, long timeout, TimeUnit unit)来设置最大等待时长。

  • 相关阅读:
    虚函数和纯虚函数
    MS CRM 2011中PartyList类型字段的实例化
    MS CRM 2011的自定义与开发(12)——表单脚本扩展开发(4)
    MS CRM 2011的自定义与开发(12)——表单脚本扩展开发(2)
    MS CRM 2011的自定义和开发(10)——CRM web服务介绍(第二部分)——IOrganizationService(二)
    MS CRM 2011 SDK 5.08已经发布
    MS CRM 2011 Q2的一些更新
    最近很忙
    Microsoft Dynamics CRM 2011最近的一些更新
    补一篇,Update Rollup 12 终于发布了
  • 原文地址:https://www.cnblogs.com/forerver-elf/p/7676449.html
Copyright © 2011-2022 走看看