1.大纲
CountDownLatch倒计时门闩
Semaphore信号量
Condition条件对象
CyclicBarrier循环栅栏
一:控制并发流程
1.说明
作用让程序员容易得到线程之间的合作
线程之间的合作,满足业务逻辑
2.常见工具类

二:CountDownLatch的使用
1.作用

2.主要方法
仅有一个构造函数
await:调用后,线程会被挂起,然后等待count为0的时候,才进行进行执行
countDown:将count减1
3.注意点
只有调用await的的才会被等待,减1的线程不会等待,继续执行

4.典型用法一
一个线程等待多个线程都执行完成,再进行工作
package com.jun.juc.flowcontroller;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 等待全部完成,才算完成
*/
public class CountDownLatchDemo1 {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(5);
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i + 1;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
Thread.sleep((long) (Math.random() * 1000));
System.out.println("No" + no + "完成了检查");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
}
};
executorService.submit(runnable);
}
System.out.println("等待中");
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("完成了工作");
}
}
效果:
Connected to the target VM, address: '127.0.0.1:61918', transport: 'socket' 等待中 No1完成了检查 No5完成了检查 No4完成了检查 No3完成了检查 No2完成了检查 完成了工作 Disconnected from the target VM, address: '127.0.0.1:61918', transport: 'socket'
5.典型用法二
多个线程等待某一个线程的信号,同时进行执行
package com.jun.juc.flowcontroller;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchDemo2 {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(1);
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i=0;i<5;i++){
int no = i + 1;
Runnable ru = new Runnable(){
@Override
public void run() {
System.out.println(no + "准备完毕,等待发令枪");
try {
countDownLatch.await();
System.out.println(no +"开始跑步");
}catch (Exception e){
e.printStackTrace();
}
}
};
executorService.submit(ru);
}
try {
Thread.sleep(2000);
System.out.println("发令枪响起");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
效果:
Connected to the target VM, address: '127.0.0.1:62258', transport: 'socket' 3准备完毕,等待发令枪 1准备完毕,等待发令枪 5准备完毕,等待发令枪 4准备完毕,等待发令枪 2准备完毕,等待发令枪 发令枪响起 3开始跑步 1开始跑步 2开始跑步 4开始跑步 5开始跑步 Disconnected from the target VM, address: '127.0.0.1:62258', transport: 'socket'
6.总结
在创建实例的时候,需要传递倒数次数
倒数到0,之前等待的线程会继续的执行
不能回滚重置
三:Semaphore信号量
1.功能
可以用来限制或者管理数量有限的资源的使用

2.使用
初始化许可证的数量
代码前使用acquire或者acquireUninterruptibly方法
任务结束后,使用release释放
3.方法介绍
new Semaphore(permits, fair),可以设置是否使用公平,如果传入true,那么会把等待的线程放到FIFO的队列中,有了许可证,发给等待时间最近的线程
acquire:获取,可以相应中断
tryAcquire:看看是否有,没有,不会陷入阻塞
tryAcquire(timeOut):多了一个超时的时间
release:释放
4.程序
package com.jun.juc.flowcontroller;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
static Semaphore semaphore = new Semaphore(3, true);
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(50);
for (int i=0; i<50; i++){
executorService.submit(new Task());
}
executorService.shutdown();
}
static class Task implements Runnable{
@Override
public void run() {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"获取到了许可证");
try {
Thread.sleep((long)Math.random()*2000);
System.out.println(Thread.currentThread().getName()+"释放了许可证");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
效果:
Disconnected from the target VM, address: '127.0.0.1:54761', transport: 'socket' pool-1-thread-2获取到了许可证 pool-1-thread-3获取到了许可证 pool-1-thread-1获取到了许可证 pool-1-thread-2释放了许可证 pool-1-thread-1释放了许可证 pool-1-thread-3释放了许可证 pool-1-thread-4获取到了许可证 pool-1-thread-6获取到了许可证 pool-1-thread-4释放了许可证 pool-1-thread-6释放了许可证 pool-1-thread-7获取到了许可证 pool-1-thread-5获取到了许可证 pool-1-thread-7释放了许可证 pool-1-thread-8获取到了许可证 pool-1-thread-9获取到了许可证 pool-1-thread-5释放了许可证 pool-1-thread-8释放了许可证 pool-1-thread-12获取到了许可证 pool-1-thread-9释放了许可证 pool-1-thread-12释放了许可证 pool-1-thread-11获取到了许可证 pool-1-thread-10获取到了许可证 pool-1-thread-10释放了许可证 pool-1-thread-13获取到了许可证 pool-1-thread-15获取到了许可证 pool-1-thread-11释放了许可证 pool-1-thread-15释放了许可证 pool-1-thread-13释放了许可证 pool-1-thread-16获取到了许可证 pool-1-thread-14获取到了许可证 pool-1-thread-16释放了许可证 pool-1-thread-17获取到了许可证 pool-1-thread-18获取到了许可证 pool-1-thread-14释放了许可证 pool-1-thread-18释放了许可证 pool-1-thread-17释放了许可证 pool-1-thread-20获取到了许可证 pool-1-thread-19获取到了许可证 pool-1-thread-20释放了许可证 pool-1-thread-19释放了许可证 Process finished with exit code 0
说明:
最多有三个线程在运行
5.acquire与release方法中设置权重
里面的数字要一样
package com.jun.juc.flowcontroller;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
static Semaphore semaphore = new Semaphore(3, true);
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(50);
for (int i=0; i<20; i++){
executorService.submit(new Task());
}
executorService.shutdown();
}
static class Task implements Runnable{
@Override
public void run() {
try {
semaphore.acquire(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"获取到了许可证");
try {
Thread.sleep((long)Math.random()*2000);
System.out.println(Thread.currentThread().getName()+"释放了许可证");
semaphore.release(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
6.注意点
并不是必须由获取许可证的那个线程释放那个许可证
四:Condition接口
1.说明
又被称为条件对象
2.作用
当线程1需要等待某个条件的时候,就执行await方法,进入阻塞
直到这个条件达成的时候,线程2就执行signal,jvm就会从阻塞的线程中直到等待的线程,线程1就收到了可执行的信号,线程状态就会变成Runnable可执行状态
3.signal与signalAll的区别
signal只会唤醒等待时间最长的线程
4.基本用法
package com.jun.juc.flowcontroller;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionDemo {
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public static void main(String[] args) {
ConditionDemo conditionDemo = new ConditionDemo();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
conditionDemo.m2();
}catch (Exception e){
e.printStackTrace();
}
}
}).start();
conditionDemo.m1();
}
void m1(){
lock.lock();
try {
System.out.println("开始等待");
condition.await();
System.out.println("条件满足了");
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
void m2(){
lock.lock();
try {
System.out.println("唤醒其他线程");
condition.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
效果:
Connected to the target VM, address: '127.0.0.1:64521', transport: 'socket' 开始等待 唤醒其他线程 条件满足了 Disconnected from the target VM, address: '127.0.0.1:64521', transport: 'socket' Process finished with exit code 0
5.实现生产者与消费者
package com.jun.juc.flowcontroller;
import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionProConsumer {
private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
ConditionProConsumer conditionProConsumer = new ConditionProConsumer();
Producer producer = conditionProConsumer.new Producer();
Consumer consumer = conditionProConsumer.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread{
@Override
public void run() {
consumer();
}
private void consumer() {
while (true){
lock.lock();
try {
while (queue.size()==0){
System.out.println("队列为空");
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll();
notFull.signal();
System.out.println("取出一个数据,还有"+queue.size()+"个数据");
}finally {
lock.unlock();
}
}
}
}
class Producer extends Thread{
@Override
public void run() {
producer();
}
private void producer() {
while (true){
lock.lock();
try {
while (queue.size()==queueSize){
System.out.println("队列已满");
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.offer(1);
notEmpty.signal();
System.out.println("向队列中增加了一个元素,还有"+queue.size()+"个数据");
}finally {
lock.unlock();
}
}
}
}
}
效果片段
向队列中增加了一个元素,还有6个数据 向队列中增加了一个元素,还有7个数据 向队列中增加了一个元素,还有8个数据 向队列中增加了一个元素,还有9个数据 向队列中增加了一个元素,还有10个数据 队列已满 取出一个数据,还有9个数据 取出一个数据,还有8个数据 取出一个数据,还有7个数据 取出一个数据,还有6个数据 取出一个数据,还有5个数据 取出一个数据,还有4个数据 取出一个数据,还有3个数据 取出一个数据,还有2个数据 取出一个数据,还有1个数据 取出一个数据,还有0个数据 队列为空 向队列中增加了一个元素,还有1个数据 向队列中增加了一个元素,还有2个数据 取出一个数据,还有1个数据 取出一个数据,还有0个数据 队列为空 向队列中增加了一个元素,还有1个数据 向队列中增加了一个元素,还有2个数据 向队列中增加了一个元素,还有3个数据 向队列中增加了一个元素,还有4个数据
6.Condition注意点
await方法会自动释放持有的lock锁,与Object.wait一样,不要手动释放
与wait一样,调用的时候,必须有锁,不然抛出异常
五:CyclicBarrier循环栅栏
1.说明场景
大量的线程计算不同的任务,最后汇总的时候,可以使用。线程都在集结点等待,全部到齐后,栅栏就会被撤销,所有的线程就统一再出发,继续执行下面的任务
2.基本用法
可重用性
package com.jun.juc.flowcontroller;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("都到了,统一出发");
}
});
for (int i=0;i<10;i++){
new Thread(new Task(i, cyclicBarrier)).start();
}
}
static class Task implements Runnable{
private int id;
private CyclicBarrier cyclicBarrier;
public Task(int id, CyclicBarrier cyclicBarrier){
this.id = id;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程"+id+"前往集合地点");
try {
Thread.sleep((long)Math.random()*10000);
System.out.println("线程"+id+"到了集合地点,开始等待其他人到达");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
效果
Connected to the target VM, address: '127.0.0.1:59751', transport: 'socket' 线程0前往集合地点 线程4前往集合地点 线程5前往集合地点 线程2前往集合地点 线程1前往集合地点 线程3前往集合地点 线程8前往集合地点 线程7前往集合地点 线程1到了集合地点,开始等待其他人到达 线程0到了集合地点,开始等待其他人到达 线程3到了集合地点,开始等待其他人到达 Disconnected from the target VM, address: '127.0.0.1:59751', transport: 'socket' 线程9前往集合地点 线程9到了集合地点,开始等待其他人到达 线程8到了集合地点,开始等待其他人到达 线程4到了集合地点,开始等待其他人到达 线程6前往集合地点 都到了,统一出发 线程2到了集合地点,开始等待其他人到达 线程5到了集合地点,开始等待其他人到达 线程7到了集合地点,开始等待其他人到达 线程6到了集合地点,开始等待其他人到达 都到了,统一出发 Process finished with exit code 0
等五个到齐后,就统一出发了。
3.CyclicBarrier与CountDownLatch的区别
作用不同:CyclicBarrier要等固定数量的线程到了栅栏位置才继续执行,而CountDownLatch只需要到达数字0。CountDownLatch是用于事件的,而CyclicBarrier是用于线程的
重用性不同:上面演示过