zoukankan      html  css  js  c++  java
  • Java中的并发工具类

    CountDownLatch  

      CountDownLatch允许一个或多个线程等待其他线程完成操作

      当需要解析一个Excel里面有多个sheet数据时,可以使用多线程,每个线程解析一个sheet里的数据。主线程等待所有线程执行完sheet的解析操作。

      

    public class JoinCountDownLatchTest(){
    
      public static void main(String[] args){
        Thread parser1 = new Thread(new Runnable)_{
          public void run(){
          }
        });
        Thread parser2 = new Thread(new Runnable)_{
          public void run(){
          }
        });
        parser1.start();
        parser2.start();
        parser1.join();
        parser1.join();
        System.out.print("all parsers finished");
      }
    
    }

      join用于让当前执行线程等待join线程执行结束。其原理是不停检查join线程是否存活,若join线程存活则让当前线程永远等待。直到join()线程中之后,线程的this.notifyAll()方法会被调用,调用notifyAll()方法是JVM里实现的。

    public class CountDownLatchTest{
      static CountDownLatch c = new CountDownLatch(2);
      public static void main(String[] args) throws InterruptedException{
        new Thread(new Runnable(){
          System.out.println(1);
          c.countDown();
          System.out.println(2);
          c.countDown();
        }).start();
        c.wait();
        System.out.println("3");
      }
    }

      CountDownLatch的构造函数接受一个int类型的参数作为计数器,若想等待N个点完成,就传入N。当我们调用CountDownLatch的countDown方法时,N就减1,CountDownLatch的await方法会阻塞当前线程,直到N变成0。由于countDown方法可以用在任何定法,即可以使N个线程。只需要把这个CountDownLatch的引用传递到线程里即可。若某个线程处理的比较慢,可以使用await的重载方法await(long time, TimeUnit unit)。

      计数器必须大于0,当计数器等于0时,调用await()方法不会阻塞当前线程。CountDownLatch不可能重新初始化或修改CountDownLatch对象的内部计数器的值。一个线程调用countDown方法happen-before另外一个线程调用await方法。

    同步屏障CyclicBarrier

      CyclicBarrier的意思是可循环使用(Cyclic)的屏障(Barrier)。让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续执行。

      CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await()方法高速CyclicBarrier我已经到达了屏障,然后当前贤臣被阻塞。

    public class CyclicBarrireTest{
      static CyclicBarrier c = new CyclicBarrier(2);
      public static void main(String[] args){
        new Thread(new Runnable(){
          public void run(){
            try{
              c.await();
            }catch(Exception e){
            }
            System.out.println(1);
          }
        }).start();
        try{
          c.await();
        }catch(Exception e){
          System.out.println(2);
        }
      }
    }

      若把new CyclicBarrier(2)修改成new CyclicBarrire(3),则主线程和子线程会永远等待,因为没有第三个线程执行await方法,即没有第三个线程达到屏障,所以之前到达屏障的两个线程都不会继续执行。

      CyclicBarrier还提供给了构造函数CyclicBarrier(int parties, Runnable barrierAction)用于在线程到达屏障时先执行barrierAction。

    public class CyclicBarrierTest2{
      static CyclicBarrier c = new CyclicBarrier(2, new A());
      public static void main(String[] args){
        new Thread(new Runnable(){
          public void run(){
            try{
              c.await(); 
            }catch(Exception e){}
            System.out.println(1);
          }
        }).start();
        try{ 
          c.await();
        }catch(Exception e){}
        System.out.println(2);
      }
    
      static class A implements Runnable{
        public void run(){
          System.out.println(3);
        }
      }
    }

      CyclicBarrier可用于多线程计算数据最后合并计算结果的场景。

    public class BankWaterService implements Runnable{
      private CyclicBarrier c = new CyclicBarrier(4, this);
      private Executor executor = Executors.newFixedThreadPool(4, this);
      private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<String, Integer>();
      
      private void count(){
        for(int i = 0; i < 4; i++){
          executor.execute(new Runnable(){
            public void run(){
              sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
              try{
               c.await(); 
              }catch(InterruptedException | BrokenBarrierException e){
                e.printStackTrace();
              }
            }
          });
        }
      }
    
      public void run(){
        int result = 0;
        for(Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()){
          result += sheet.getValue();
        }
        sheetBankWaterCount.put("result". result);
        System.out.println(result);
      }
    
      public static void main(String[] args){
        BankWaterService bankWaterCount = new BankWaterService();
        bankWaterCount.count();
      }
      
    }
    public class BankWaterService implements Runnable{
      private CyclicBarrier c = new CyclicBarrier(4, this);
      private Executor executor = Executors.newFixedThreadPool(4, this);
      private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<String, Integer>();
      
      private void count(){
        for(int i = 0; i < 4; i++){
          executor.execute(new Runnable(){
            public void run(){
              sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
              try{
               c.await(); 
              }catch(InterruptedException | BrokenBarrierException e){
                e.printStackTrace();
              }
            }
          });
        }
      }
    
      public void run(){
        int result = 0;
        for(Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()){
          result += sheet.getValue();
        }
        sheetBankWaterCount.put("result". result);
        System.out.println(result);
      }
    
      public static void main(String[] args){
        BankWaterService bankWaterCount = new BankWaterService();
        bankWaterCount.count();
      }
      
    }

    CyclicBarrier和CountDownLatch的区别

      CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。CyclicBarrier还提供了getNumberWaiting方法获取CyclicBarrier阻塞的线程数量。isBroker()方法用来了解阻塞的线程是否被中断。

    Semaphore

      Semaphore是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

      Semaphore可以用于做流量控制,特别是公共资源有限的应用场景,如连接数据库。

    public class SemaphoreTest(){
      private static final int THREAD_COUNT = 30;
      private static final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
      private static Semaphore s = new Semaphore(10);
      public static void main(String[] args){
        for(int i = 0; i < THREAD_COUNT; i++){
          threadPool.executor(new Runnable(){
            try{
             s.acquire();
             System.out.println("save data");
             s.release();
            }catch(InterruptedException e){}
          });
        }
        threadPool.shutdown();
      }
    }

      Semaphore提供如下方法:

        acquire():获取一个许可证

        release():归还许可证

        int avaliablePermits():返回信号量中当前可用的许可证书

        int getQueueLength():返回正在等待获取许可证的线程书

                  boolean hasQueuedThreads():是否有线程正在等待获取许可证

        void reducePermits(int reduction):减少reduction个许可证,protected

        Collection getQueuedThreads():返回所有等待获取许可证的线程集合,protected

    Exchanger

      Exchanger是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,若第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange()方法,当两个线程都到达同步点时,这两个线程可以交换数据,将本线程生产出的数据传递给对方。

      Exchange可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候交换两个人数据,并使用交叉规则得出2个交配的结果。同时Exchanger也可以用于校对工作。为了避免错误,采用AB岗两人进行录入数据到Excel后,系统加载两个Excel并对两个Excel数据进行校对。

    public class ExchangerTest{
      private static final Exchanger<String> exgr = new Exchanger<String>();
      private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
      
      public static void main(String[] args){
        threadPool.execute(new Runnble(){
          public void run(){
            try{
              String A = "banker A";
              exgr.exchange(A);
            }catch(InterruptedException){}
          }
        });
        threadPool.execute(new Runnable(){
          public void run(){
            String B = "banker B";
            String A = exgr.exchange(B);
            System.out.println("A equals to B : " + A.equals(B));
          }
        });
      threadPool.shutdown(); 
      }
    }

      若两个线程有一个没有执行exchange()方法,则会一直等待。此时为避免一直等候,可以调用exchange(V x, long timeout, TimeUnit unit)来设置最大等待时长。

  • 相关阅读:
    cpu几种架构区别
    linux之cp/scp命令+scp命令详解
    解读Linux命令格式(转)
    IO虚拟化简单了解
    NoSQL-来自维基百科
    kvm命令参数记录
    kvm 简单了解
    host与guest间共享文件夹的三种方法(原创)
    新装linux系统最基本设置
    kernel编译速度提高
  • 原文地址:https://www.cnblogs.com/forerver-elf/p/7676449.html
Copyright © 2011-2022 走看看