zoukankan      html  css  js  c++  java
  • 并发工具类

    并发工具类

    一、CountDownLatch

      CountDownLatch类位于java.util.concurrent包下,利用它可以实现类似计数器的功能。

      CountDownLatch是一个同步计数器,初始化的时候传入需要计数的线程等待数,可以是需要等待执行完成的线程数,或者大于;

      作用:用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。是一组线程等待其他的线程完成工作以后在执行,相当于加强版join;

      await():阻塞当前线程,等待其他线程执行完成,直达计数器计数值减到0;

      countDown():负责计算器的减一;

      比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。

    package com.zn.CountDownLatch;
    
    import java.util.concurrent.CountDownLatch;
    
    public class CountDownLatchTest {
        public static void main(String[] args) throws InterruptedException {
            System.out.println("等待子线程执行完毕...");
            final CountDownLatch countDownLatch = new CountDownLatch(2);
            new Thread(new Runnable() {
    
                @Override
                public void run() {
                    System.out.println("子线程," + Thread.currentThread().getName() + "开始执行...");
                    countDownLatch.countDown();// 每次减去1
                    System.out.println("子线程," + Thread.currentThread().getName() + "结束执行...");
                }
            }).start();
            new Thread(new Runnable() {
    
                @Override
                public void run() {
                    System.out.println("子线程," + Thread.currentThread().getName() + "开始执行...");
                    countDownLatch.countDown();
                    System.out.println("子线程," + Thread.currentThread().getName() + "结束执行...");
                }
            }).start();
    
            countDownLatch.await();// 调用当前方法主线程阻塞  countDown结果为0, 阻塞变为运行状态
            System.out.println("两个子线程执行完毕....");
            System.out.println("继续主线程执行..");
        }
    }

    控制台效果:

      

    二、CyclicBarrier

      CyclicBarrier初始化时规定一个数目,然后计算调用了CyclicBarrier.await()进入等待的线程数。当线程数达到了这个数目时,所有进入等待状态的线程被唤醒并继续。 

       CyclicBarrier就象它名字的意思一样,可看成是个障碍, 是多线程中一个重要的类,主要用于线程内部之间的线程的相互等待问题,初始化的时候传入需要等待的线程数;

      作用:让一组线程达到某一个屏障被阻塞,一直到组内最后一个线程达到屏障时,屏障开放,所有被阻塞的线程才会继续运行;

      CyclicBarrier(int parties):初始化定义需要等待的线程数parties;

      CyclicBarrier(int parties,Runnable barrierAction):当屏障开放的时候,线程barrierAction的任务会执行;

      CyclicBarrier初始时还可带一个Runnable的参数, 此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。

    package com.zn.containerTest;
    
    import java.util.concurrent.CyclicBarrier;
    
    public class CyclicBarrierTest extends Thread{
    
        private CyclicBarrier cyclicBarrier;
    
        public CyclicBarrierTest(CyclicBarrier cyclicBarrier){
            this.cyclicBarrier=cyclicBarrier;
        }
    
        @Override
        public void run() {
            System.out.println("线程" + Thread.currentThread().getName() + ",正在写入数据");
            try {
                Thread.sleep(3000);
            } catch (Exception e) {
                // TODO: handle exception
            }
            System.out.println("线程" + Thread.currentThread().getName() + ",写入数据成功.....");
    
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
            }
            System.out.println("所有线程执行完毕..........");
        }
    
    }
    
    class Test1 {
    
        public static void main(String[] args) {
            CyclicBarrier cyclicBarrier=new CyclicBarrier(5);
            for (int i = 0; i < 5; i++) {
                CyclicBarrierTest cyclicBarrierTest = new CyclicBarrierTest(cyclicBarrier);
                cyclicBarrierTest.start();
            }
        }
    }

    控制台效果:

      

    CountDownLatch和CyclicBarrier的区别:

      1.CountDownLatch放行由第三者控制,CyclicBarrier放行由一组线程本身控制;

      2.CountDownLatch放行条件>=线程数,CyclicBarrier放行条件=线程数;

      3.CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程;

      4.CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让我们重新执行一次;

    三、Semaphore

      Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。

      Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。

      它的用法如下:

        availablePermits函数用来获取当前可用的资源数量

        wc.acquire(); //申请资源

          wc.release();// 释放资源

      作用:Semaphore管理一系列许可证。每个acquire方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个release方法增加一个许可证,这可能会释放一个阻塞的acquire方法。然而,其实并没有实际的许可证这个对象,Semaphore只是维持一个可获得许可证的数量,主要控制同时访问某个特定资源的线程数量,多用在流量控制;

      注意:其他Semaphore的底层显示就是基于AQS的共享锁实现的

      如果一个线程要访问共享资源,必须先获得信号量,如果信号量的计数器值大于1,意味着有共享资源可以访问,则使其计数器值减去1,在访问共享资源。如果计数器值为0,线程进入休眠。当某个线程使用完共享资源后,释放信号量,并将信号量内部的计数器加1,之间进入休眠的线程将被唤醒并再次试图获取信号量;

    package com.zn.containerTest;
    
    import java.util.concurrent.Semaphore;
    
    public class SemaphoreTest {
        public static void main(String[] args) {
            // 创建一个计数阈值为5的信号量对象
            // 只能5个线程同时访问
            Semaphore semp = new Semaphore(5);
    
            try {
                // 申请许可
                semp.acquire();
                try {
                    // 业务逻辑
                } catch (Exception e) {
    
                } finally {
                    // 释放许可
                    semp.release();
                }
            } catch (InterruptedException e) {
    
            }
        }
    }

    代码测试:

      Parent类:

    package com.zn.containerTest;
    
    import java.util.Random;
    import java.util.concurrent.Semaphore;
    
    public class Parent implements Runnable {
    
        private String name;
        private Semaphore wc;
    
        public Parent(String name,Semaphore wc){
            this.name=name;
            this.wc=wc;
        }
    
        @Override
        public void run() {
            try {
                // 剩下的资源(剩下的茅坑)
                int availablePermits = wc.availablePermits();
                if (availablePermits > 0) {
                    System.out.println(name+"天助我也,终于有茅坑了...");
                } else {
                    System.out.println(name+"怎么没有茅坑了...");
                }
                //申请茅坑 如果资源达到3次,就等待
                wc.acquire();
                System.out.println(name+"终于轮我上厕所了..爽啊");
                Thread.sleep(new Random().nextInt(1000)); // 模拟上厕所时间。
                System.out.println(name+"厕所上完了...");
                wc.release();
    
            } catch (Exception e) {
    
            }
        }
    }

      TestSemaphore1类:

    package com.zn.containerTest;
    
    import java.util.concurrent.Semaphore;
    
    public class TestSemaphore1 {
        public static void main(String[] args) {
            // 一个厕所只有3个坑位,但是有10个人来上厕所,那怎么办?假设10的人的编号分别为1-10,并且1号先到厕所,10号最后到厕所。
          那么1-3号来的时候必然有可用坑位,顺利如厕,4号来的时候需要看看前面3人是否有人出来了,如果有人出来,进去,否则等待。同样的道理,
          4-10号也需要等待正在上厕所的人出来后才能进去,并且谁先进去这得看等待的人是否有素质,是否能遵守先来先上的规则。
    Semaphore semaphore = new Semaphore(3); for (int i = 1; i <=10; i++) { Parent parent = new Parent("第"+i+"个人,",semaphore); new Thread(parent).start(); } } }

    控制台效果:

       

    四、Exchanger

      Exchanger类似于一个交换器,可以对元素进行配对和交换的线程的同步点,用于两个线程间的数据交换,线程数量必须为偶数

      具体来说,Exchanger类允许在两个线程之间定义同步点。当两个线程都到达同步点时,它们交换数据结构,因此第一个线程的数据结构进行第二个线程中,第二个线程的数据结构进入到第一个线程中;

      就像两个线程各个交换自己的数据;

    package com.zn.ConcurrentUtility;
    
    import java.util.concurrent.Exchanger;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    public class ExchangerTest {
        private static String str1="资源1";
        private static String str2="资源2";
        //构建资源交换对象
        private static Exchanger<String> stringExchanger=new Exchanger<>();
        public static void main(String[] args) {
            //第一个线程
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"初始占用资源:"+str1);
                //资源交换,将资源交给其他线程和获取到其他线程交换过来的资源
                try {
                    String newStr = stringExchanger.exchange(str1);
                    System.out.println(Thread.currentThread().getName()+"交换资源:"+newStr);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
    
            //第二个线程
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"初始占用资源:"+str2);
                //资源交换,将资源交给其他线程和获取到其他线程交换过来的资源
                try {
                    String newStr = stringExchanger.exchange(str2);
                    System.out.println(Thread.currentThread().getName()+"交换资源:"+newStr);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    控制台效果:

       

      

  • 相关阅读:
    SVNKit学习——基于Repository的操作之print repository tree、file content、repository history(四)
    java操作svn【svnkit】实操
    python笔记38-使用zmail发各种邮件案例代码
    python笔记37-史上最好用的发邮件zmail
    python笔记3-邮件发送(smtplib)
    第9期《python3接口自动化测试》课程,6月29号开学!
    anyproxy学习4-Linux(Centos)搭建anyproxy环境
    anyproxy学习3-修改返回内容(beforeSendResponse)
    anyproxy学习2-rule模块实现接口mock功能
    anyproxy学习1-windows平台安装和抓手机app上https请求
  • 原文地址:https://www.cnblogs.com/Zzzzn/p/12482877.html
Copyright © 2011-2022 走看看