一 countDownlatch(减一的B琐)
1 作用:
等待一组线程执行完成后,再执行接下来的业务(加强版的join)
2 基本原理
我们申请1个常量 static countDonwLatch latch = new countDonwLatch(3);
如服务启动时,我们有3个框架服务有先跑完,最后跑主服务
每启动1个框架服务,就调用1次 latch.countdonw(1) //计数器-1
在准备要启动主服务的地方,await(); //阻塞等待的 会等到latch 的数量减为0 后,才会继续往下执行
3 实例演示
假如我们有3个初始化线程要先跑完,最后才能开始主线程工作
首先申明1个计数器常量
/** * 申明个计数器常量 大小为6 */ static CountDownLatch latch = new CountDownLatch(6);
写个初始化线程
/** * 初始化服务线程 */ class initThread extends Thread { @Override public void run() { System.out.println(); System.out.println("read to init work"); //计数器减1 latch.countDown(); System.out.println("finish to init work"); //计数器减1 latch.countDown(); } }
我们把初始化分为2步,并扣减了2次计数器数量
启动测试类
@Test public void test() { //启动3个初始化线程 new initThread().start(); new initThread().start(); new initThread().start(); try { //等待计数器数量为0 否则一直阻塞 latch.await(); } catch (Exception ex) { ex.printStackTrace(); } //开始主业务工作 System.out.println("开始主业务工作"); }
3个初始化线程,最终会扣减完成计数器内6的数量,当计数器数量为0 后,再开始主业务工作
输出结果:
read to init work
finish to init work
read to init work
finish to init work
read to init work
finish to init work
开始主业务工作
二 cyclicbarrier
1 作用:
CyclicBarrier允许一组线程在到达某个栅栏点(common barrier point)互相等待,直到最后一个线程到达栅栏点,栅栏才会打开,处于阻塞状态的线程恢复继续执行
2 实例演示 构造函数为 CyclicBarrier(int count)
当有4个工作线程,需要执行到某一处后,等待4个线程都到齐后,再继续往下执行
申明1个长度为4的CyclicBarrier 常量
/** * 申明个常量 大小为4 */ static CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
工作线程
/** * 工作线程 */ class subThread extends Thread { @Override public void run() { //随机决定是否休眠 Random random = new Random(); if (random.nextBoolean()) { try { Thread.sleep(1000); } catch (Exception ex) { ex.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "线程睡眠了1000毫秒"); } System.out.println(Thread.currentThread().getName() + " 到达await()"); try { //等待其他线程到达此处后 一起执行 cyclicBarrier.await(); //开始后续业务 System.out.println(Thread.currentThread().getName() + "执行后续业务"); } catch (Exception e) { e.printStackTrace(); } } }
测试类
@Test public void test() { //启动4个工作线程 for (int i = 0; i < 4; i++) { new subThread().start(); } try { Thread.sleep(10000); } catch (Exception ex) { ex.printStackTrace(); } }
启动了4个工作线程,最终会在 cyclicBarrier.await() 处阻塞,4个线程到期后再一起往下执行后续业务
运行结果:
Thread-2 到达await() Thread-3 到达await() Thread-1 到达await() Thread-0线程睡眠了1000毫秒 Thread-0 到达await() Thread-0执行后续业务 Thread-2执行后续业务 Thread-3执行后续业务 Thread-1执行后续业务
3 实例演示 构造函数为 CyclicBarrier(int count,runnable able);
与 CyclicBarrier(int count)相比,不同点是,当这组线程达到栅栏(屏障)处的时候,先执行able线程。
如 able 线程的实现是打印一句话 ,如下:
System.out.println("所有玩家已就绪,玩家战队开始游戏");
那上面2处实例的输出结果就是
Thread-2 到达await() Thread-3 到达await() Thread-1 到达await() Thread-0线程睡眠了1000毫秒 Thread-0 到达await() 所有玩家已就绪,玩家战队开始游戏 Thread-0执行后续业务 Thread-2执行后续业务 Thread-3执行后续业务 Thread-1执行后续业务
CyclicBarrier和CountDownLatch比较:
与CountDownLatch的区别就是是否相互等待。举一个例子,CountDownLatch就好比是马拉松比赛,跑完的人不用等待其他选手是否结束,
而CyclicBarrier需要等最后一个玩家加载结束 等10人到齐才能开始游戏。这就是区别。
三 semaphore
1 简介
Semaphore,是JDK1.5的java.util.concurrent并发包中提供的一个并发工具类。所谓Semaphore即 信号量 的意思。是一种计数器,用来保护一个或者多个共享资源的访问,如果线程要访问一个资源就必须先获得信号量。如果信号量内部计数器大于0,信号量减1,然后允许共享这个资源;否则,如果信号量的计数器等于0,信号量将会把线程置入休眠直至计数器大于0,当信号量使用完时,必须释放。
官方是这样解释的:
Semaphore用于限制可以访问某些资源(物理或逻辑的)的线程数目,他维护了一个许可证集合,有多少资源需要限制就维护多少许可证集合,假如这里有N个资源,那就对应于N个许可证,同一时刻也只能有N个线程访问。一个线程获取许可证就调用acquire方法,用完了释放资源就调用release方法。
不过这样的解释实在有点抽象,现在用我自己的话来解释一下:
相信在学生时代都去餐厅打过饭,假如有3个窗口可以打饭,同一时刻也只能有3名同学打饭。第四个人来了之后就必须在外面等着,只要有打饭的同学好了,就可以去相应的窗口了。
比如说这张图,就全是了Semaphore的基本使用。认识一个知识点的最好方式就是直接去使用,我们干脆直接上代码来看看如何使用。
2 代码使用
我们先看Test类:
在这个代码中我们看到,主要是new了一个Semaphore,然后赋给每一位同学Student,接下来我们就来好好看看Student线程是如何实现的。
这个Student类中我们最主要看run方法的实现,首先我们通过acquire获取了当前窗口的许可,然后休眠3秒代表打饭,最后在finally使用release方法释放这个窗口许可证。代码很简单,原理很清楚,我们测试一波:
这个结果你也看到了,基本上同一时刻只能有三个学生在窗口旁边
在这里你可能有一个疑问了,Semaphore好像和synchronized关键字没什么区别,都可以实现同步,如果是这样那说明我们还没有真正理解jdk的注释,
他只是限制了访问某些资源的线程数,其实并没有实现同步,我们可以看一下:
现在我们在获取许可前增加了一条输出语句,也就是能打印出有哪个线程进入了,再去测试一波
结果很清晰,所以对于Semaphore来说,我们需要记住的其实是资源的互斥而不是资源的同步,在同一时刻是无法保证同步的,但是却可以保证资源的互斥。
信号量里的常用方法
在上面我们使用最基本的acquire方法和release方法就可以实现Semaphore最常见的功能,不过其他方法还是需要我们去了解一下的。
1 void acquire() //从信号量获取一个许可,如果无可用许可前将一直阻塞等待
2 acquire(int permits) // 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。就好比是一个学生占两个窗口。这同时也对应了相应的release方法。
3 void release() // 释放一个许可,别忘了在finally中使用,注意:多次调用该方法,会使信号量的许可数增加,达到动态扩展的效果,如:初始permits为1, 调用了两次release,最大许可会改变为2
4 release(int permits) //释放给定数目的许可,将其返回到信号量。这个是对应于上面的方法,一个学生占几个窗口完事之后还要释放多少
5 availablePermits() // 返回此信号量中当前可用的许可数。也就是返回当前还有多少个窗口可用。
6 reducePermits(int reduction) // 根据指定的缩减量减小可用许可的数目。
7 hasQueuedThreads() // 查询是否有线程正在等待获取资源。
8 getQueueLength() // 返回正在等待获取的线程的估计数目。该值仅是估计的数字。
9 tryAcquire(int permits, long timeout, TimeUnit unit) // 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
10 acquireUninterruptibly(int permits) // 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。
基本上常见的使用方法都在这,Semaphore底层是由AQS和Uasafe完成的,篇幅问题在这里不赘述了。感谢各位支持。
四 Exchanger
概念:
java.util.concurrent包中的Exchanger类可用于两个线程之间交换信息。可简单地将Exchanger对象理解为一个包含两个格子的容器,通过exchanger方法可以向两个格子中填充信息。当两个格子中的均被填充时,该对象会自动将两个格子的信息交换,然后返回给线程,从而实现两个线程的信息交换。
Exchanger的构造方法如下:
- Exchanger(); //创建一个新的 Exchanger。
Exchanger用到的主要方法有:
-
exchange(V x); //等待另一个线程到达此交换点(除非它被中断),然后将给定的对象传送给该线程,并接收该线程的对象。
-
exchange(V x, long timeout, TimeUnit unit); // 等待另一个线程到达此交换点(除非它被中断,或者超出了指定的等待时间),然后将给定的对象传送给该线程,同时接收该线程的对象。
代码实现
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package test; import java.util.concurrent.Exchanger; class ExchangerTest extends Thread { private Exchanger<String> exchanger; private String string; private String threadName; public ExchangerTest(Exchanger<String> exchanger, String string, String threadName) { this.exchanger = exchanger; this.string = string; this.threadName = threadName; } public void run() { try { System.out.println(threadName + ": " + exchanger.exchange(string)); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public class Test { public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<>(); ExchangerTest test1 = new ExchangerTest(exchanger, "string1", "thread-1"); ExchangerTest test2 = new ExchangerTest(exchanger, "string2", "thread-2"); test1.start(); test2.start(); } }
此程序创建了两个线程,线程在执行过程中,调用同一个exchanger对象的exchange方法,进行信息通信,当两个线程均已将信息放入到exchanger对象中时,exchanger对象会将两个线程放入的信息交换,然后返回,该程序的执行结果如下图:
另外需要注意的是,Exchanger类仅可用作两个线程的信息交换,当超过两个线程调用同一个exchanger对象时,得到的结果是随机的,exchanger对象仅关心其包含的两个“格子”是否已被填充数据,当两个格子都填充数据完成时,该对象就认为线程之间已经配对成功,然后开始执行数据交换操作。
修改上文中的示例代码,在第33行之后,再创建一个新线程test3,其中线程名称为thread-3,用于交换的字符串是string3。即添加如下的代码:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
ExchangerTest test3 = new ExchangerTest(exchanger, "string3", "thread-3"); test3.start();
此时程序的运行结果是不确定的,多次运行可以发现,配对结果可能是test1和test3交换,也可能是test1和test2交换,而剩下的未得到配对的线程,则会被阻塞,永久等待,直到与之配对的线程到达位置,对于本程序,则只能强制将其停止。