zoukankan      html  css  js  c++  java
  • JUC学习笔记--JUC中并发工具类

    JUC中并发工具类

    CountDownLatch

    CountDownLatch是我目前使用比较多的类,CountDownLatch初始化时会给定一个计数,然后每次调用countDown() 计数减1,
    当计数未到达0之前调用await() 方法会阻塞直到计数减到0;

    使用场景:多用于划分任务由多个线程执行,例如:最近写个豆瓣爬虫,需要爬取每个电影的前五页短评,可以划分成五个线程来处理数据。通过latch.await()保证全部完成再返回。

     public void latch() throws InterruptedException {
            int count= 5;
            CountDownLatch latch = new CountDownLatch(count);
            for (int x=0;x<count;x++){
                new Worker(x*20,latch).start();
            }
            latch.await();
            System.out.println("全部执行完毕");
        }
        class Worker extends Thread{
            Integer start;
            CountDownLatch latch;
            public Worker(Integer start,CountDownLatch latch){
                this.start=start;
                this.latch=latch;
            }
    
            @Override
            public void run() {
                System.out.println(start+" 已执行");
                latch.countDown();
            }
        }
    
    /*
      20 已执行
      0 已执行
      40 已执行
      60 已执行
      80 已执行
      全部执行完毕
    */
    

    CyclicBarrier

    它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)也就是阻塞在调用cyclicBarrier.await()的地方。

    看上去CyclicBarrier 跟CountDownLatch 功能上类似,在官方doc上CountDownLatch的描述上就说明了,CountDownLatch 的计数无法被重置,
    如果需要重置计数,请考虑使用CyclicBarrier。

    CyclicBarrier初始时还可添加一个Runnable的参数, 此Runnable在CyclicBarrier的数目达到后,所有其它线程被唤醒前被最后一个进入 CyclicBarrier 的线程执行

    使用场景:类似CyclicBarrier,但是 CyclicBarrier提供了几个countdownlatch 没有的方法以应付更复杂的场景,例如:
    getNumberWaiting() 获取阻塞线程数量,
    isBroken() 用来知道阻塞的线程是否被中断等方法。
    reset() 将屏障重置为其初始状态。如果所有参与者目前都在屏障处等待,则它们将返回,同时抛出一个 BrokenBarrierException。

      public void latch() throws InterruptedException {
            int count = 5;
            CyclicBarrier cb = new CyclicBarrier(count, new Runnable() {
                @Override
                public void run() {
                    System.out.println("全部执行完毕");
                }
            });
            ExecutorService executorService = Executors.newFixedThreadPool(count);
            while (true){
                for (int x=0;x<count;x++){
                    executorService.execute(new Worker(x,cb));
                }
            }
        }
    
        class Worker extends Thread {
            Integer start;
            CyclicBarrier cyclicBarrier;
    
            public Worker(Integer start, CyclicBarrier cyclicBarrier) {
                this.start = start;
                this.cyclicBarrier = cyclicBarrier;
            }
    
            @Override
            public void run() {
                System.out.println(start + " 已执行");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    /*
    0 已执行
    3 已执行
    4 已执行
    2 已执行
    1 已执行
    全部执行完毕
    0 已执行
    1 已执行
    2 已执行
    3 已执行
    4 已执行
    全部执行完毕
    .....
    */
    

    Semaphore

    Semaphore 信号量维护了一个许可集,每次使用时执行acquire()从Semaphore获取许可,如果没有则会阻塞,每次使用完执行release()释放许可。

    使用场景:Semaphore对用于对资源的控制,比如数据连接有限,使用Semaphore限制访问数据库的线程数。

        public void latch() throws InterruptedException, IOException {
            int count = 5;
            Semaphore semaphore = new Semaphore(1);
            ExecutorService executorService = Executors.newFixedThreadPool(count);
                for (int x=0;x<count;x++){
                    executorService.execute(new Worker(x,semaphore));
                }
            System.in.read();
        }
    
        class Worker extends Thread {
            Integer start;
            Semaphore semaphore;
    
            public Worker(Integer start, Semaphore semaphore) {
                this.start = start;
                this.semaphore = semaphore;
            }
    
            @Override
            public void run() throws IllegalArgumentException {
                try {
                    System.out.println(start + " 准备执行");
                    TimeUnit.SECONDS.sleep(1);
                    semaphore.acquire();
                    System.out.println(start + " 已经执行");
                    semaphore.release();
                    System.out.println(start + " 已经释放");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
        }
    
    /*
    0 准备执行
    2 准备执行
    1 准备执行
    3 准备执行
    4 准备执行
    2 已经执行
    2 已经释放
    4 已经执行
    4 已经释放
    1 已经执行
    1 已经释放
    0 已经执行
    0 已经释放
    3 已经执行
    3 已经释放
    */
    

    Exchanger

    Exchanger 用于两个线程间的数据交换,它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。

    使用场景: 两个线程相互等待处理结果并进行数据传递。

        public void latch() throws InterruptedException, IOException {
            int count = 5;
            Exchanger<String> exchanger = new Exchanger<>();
            ExecutorService executorService = Executors.newFixedThreadPool(count);
                for (int x=0;x<count;x++){
                    executorService.execute(new Worker(x,exchanger));
                }
            System.in.read();
        }
    
        class Worker extends Thread {
            Integer start;
            Exchanger<String>  exchanger;
    
            public Worker(Integer start, Exchanger<String> exchanger) {
                this.start = start;
                this.exchanger = exchanger;
            }
    
            @Override
            public void run() throws IllegalArgumentException {
                try {
                    System.out.println(Thread.currentThread().getName() + " 准备执行");
                    TimeUnit.SECONDS.sleep(start);
                    System.out.println(Thread.currentThread().getName() + " 等待交换");
                    String value = exchanger.exchange(Thread.currentThread().getName());
                    System.out.println(Thread.currentThread().getName() + " 交换得到数据为:"+value);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
        }
    
    /*
    pool-1-thread-1 准备执行
    pool-1-thread-1 等待交换
    pool-1-thread-3 准备执行
    pool-1-thread-2 准备执行
    pool-1-thread-5 准备执行
    pool-1-thread-4 准备执行
    pool-1-thread-2 等待交换
    pool-1-thread-1 交换得到数据为:pool-1-thread-2
    pool-1-thread-2 交换得到数据为:pool-1-thread-1
    pool-1-thread-3 等待交换
    pool-1-thread-4 等待交换
    pool-1-thread-4 交换得到数据为:pool-1-thread-3
    pool-1-thread-3 交换得到数据为:pool-1-thread-4
    pool-1-thread-5 等待交换
    */
    

    Exchanger必须成对出现,否则会像上面代码执行结果那样,pool-1-thread-5一直阻塞等待与其交换数据的线程,为了避免这一现象,可以使用exchange(V x, long timeout, TimeUnit unit)设置最大等待时长

  • 相关阅读:
    四、运算符
    三、Golang 变量
    二、Golang的概述
    一、Golang开山篇
    部分技术使用
    Teleport_实战
    zabbix_浅谈
    渗透测试工具集合(漏洞练习平台)
    常见开源监控软件的介绍
    Ansible-大保健
  • 原文地址:https://www.cnblogs.com/javanoob/p/juc_utils.html
Copyright © 2011-2022 走看看