同步锁本质
同步的方式:独享锁 - 单个队列窗口,共享锁 - 多个队列窗口
抢锁的方式:插队抢(不公平锁)、先来后到的抢锁(公平锁)
没抢到锁的方式:快速尝试多次(CAS自选锁)、阻塞等待
唤醒阻塞线程的方式(叫号器):全部通知、通知下一个
//自己实现(独享锁) public class DevonLock implements Lock { //1.判断一个锁的状态或者说 拥有者 volatile AtomicReference<Thread> owner = new AtomicReference<>(); //保存正在等待的线程 volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>(); @Override public void lock() { boolean addQ = true; while(!tryLock()){ if(addQ){ //没有拿到锁,等待,加入等待集合 waiters.offer(Thread.currentThread()); addQ = false; }else{ //阻塞 挂起挡墙线程,不要据需往下执行 LockSupport.park();//伪唤醒,非unpark调用唤醒 } } waiters.remove(Thread.currentThread()); } @Override public void unlock() { //释放拥有者 if (owner.compareAndSet(Thread.currentThread(),null)) {//释放成功 //通知等待着 Iterator<Thread> iterator = waiters.iterator(); while(iterator.hasNext()){ Thread next = iterator.next(); LockSupport.unpark(next);//唤醒等待的线程 } } } //尝试去获取一次锁 @Override public boolean tryLock() { return owner.compareAndSet(null,Thread.currentThread()); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public Condition newCondition() { return null; } }
public class LockDemo3 { volatile int i = 0; Lock lock = new DevonLock(); public void add() { lock.lock(); try { // TODO 很多业务操作 i++; }finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { LockDemo3 ld = new LockDemo3(); for (int i = 0; i < 2; i++) { new Thread(() -> { for (int j = 0; j < 10000; j++) { ld.add(); } }).start(); } Thread.sleep(2000L); System.out.println(ld.i); } }
AQS抽象队列同步器
提供了对资源占用、释放,线程的等待、唤醒等等接口和具体实现,即,将线程释放,等待等线程方法抽象出来封装
方法定义:
acquire、 acquireShared : 定义了资源争用的逻辑,如果没拿到,则等待。
tryAcquire、 tryAcquireShared : 实际执行占用资源的操作,如何判定一个由使用者具体去实现。
release、 releaseShared : 定义释放资源的逻辑,释放之后,通知后续节点进行争抢。
tryRelease、 tryReleaseShared: 实际执行资源释放的操作,具体的AQS使用者去实现。
自己实现独享锁
public class DevonAqs { // acquire、 acquireShared : 定义了资源争用的逻辑,如果没拿到,则等待。 // tryAcquire、 tryAcquireShared : 实际执行占用资源的操作,如何判定一个由使用者具体去实现。 // release、 releaseShared : 定义释放资源的逻辑,释放之后,通知后续节点进行争抢。 // tryRelease、 tryReleaseShared: 实际执行资源释放的操作,具体的AQS使用者去实现。 // 1、 如何判断一个资源的拥有者 public volatile AtomicReference<Thread> owner = new AtomicReference<>(); // 保存 正在等待的线程 public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>(); public boolean tryAcquire() { throw new UnsupportedOperationException(); } //定义了资源争用的逻辑 即等待逻辑 public void acquire(){ boolean addQ = true; while(!tryAcquire()){ if(addQ){ //没有拿到锁,等待,加入等待集合 waiters.offer(Thread.currentThread()); addQ = false; }else{ //阻塞 挂起挡墙线程,不要据需往下执行 LockSupport.park();//伪唤醒,非unpark调用唤醒 } } waiters.remove(Thread.currentThread()); } public boolean tryRelease() { throw new UnsupportedOperationException(); } public void release(){//定义了释放资源之后的操作 if (!tryRelease()){ //通知等待着 Iterator<Thread> iterator = waiters.iterator(); while(iterator.hasNext()){ Thread next = iterator.next(); LockSupport.unpark(next);//唤醒等待的线程 } } } }
//自己实现(独享锁) public class DevonLock implements Lock { DevonAqs aqs = new DevonAqs(){ @Override public boolean tryAcquire() { return owner.compareAndSet(null,Thread.currentThread()); } @Override public boolean tryRelease() { return owner.compareAndSet(Thread.currentThread(),null); } }; //尝试去获取一次锁 @Override public boolean tryLock() { return aqs.tryAcquire(); }; @Override public void lock() { aqs.acquire(); } @Override public void unlock() { aqs.release(); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public Condition newCondition() { return null; } }
public class LockDemo3 { volatile int i = 0; Lock lock = new DevonLock(); public void add() { lock.lock(); try { // TODO 很多业务操作 i++; }finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { LockDemo3 ld = new LockDemo3(); for (int i = 0; i < 3; i++) { new Thread(() -> { for (int j = 0; j < 10000; j++) { ld.add(); } }).start(); } Thread.sleep(2000L); System.out.println(ld.i); } }
Semaphore
信号量,控制多个线程争抢许可。
acquire:获取一个许可,如果没有就等待
release:释放一个许可。
availablePermites:方法得到可用的许可项目
// 信号量机制 public class SemaphoreDemo { public static void main(String[] args) { SemaphoreDemo semaphoreTest = new SemaphoreDemo(); int N = 9; // 客人数量 Semaphore semaphore = new Semaphore(5); // 手牌数量,限制请求数量 for (int i = 0; i < N; i++) { String vipNo = "vip-00" + i; new Thread(() -> { try { semaphore.acquire(); // 获取令牌 semaphoreTest.service(vipNo); semaphore.release(); // 释放令牌 } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } // 限流 控制5个线程 同时访问 public void service(String vipNo) throws InterruptedException { System.out.println("楼上出来迎接贵宾一位,贵宾编号" + vipNo + ",..."); Thread.sleep(new Random().nextInt(3000)); System.out.println("欢送贵宾出门,贵宾编号" + vipNo); } }
自己实现信号量
public class DevonAqs { // acquire、 acquireShared : 定义了资源争用的逻辑,如果没拿到,则等待。 // tryAcquire、 tryAcquireShared : 实际执行占用资源的操作,如何判定一个由使用者具体去实现。 // release、 releaseShared : 定义释放资源的逻辑,释放之后,通知后续节点进行争抢。 // tryRelease、 tryReleaseShared: 实际执行资源释放的操作,具体的AQS使用者去实现。 // 1、 如何判断一个资源的拥有者 public volatile AtomicReference<Thread> owner = new AtomicReference<>(); // 保存 正在等待的线程 public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>(); //记录资源状态 public volatile AtomicInteger state = new AtomicInteger(0); //共享资源占用的逻辑,返回资源占用的逻辑 public int tryAcquireShared(){ throw new UnsupportedOperationException(); } public void acquireShared(){ boolean addQ = true; while (tryAcquireShared()<0){ if(addQ){ //没有拿到锁,等待,加入等待集合 waiters.offer(Thread.currentThread()); addQ = false; }else{ //阻塞 挂起挡墙线程,不要据需往下执行 LockSupport.park();//伪唤醒,非unpark调用唤醒 } } waiters.remove(Thread.currentThread()); } public void releaseShared(){ if (tryReleaseShared()){ //通知等待着 Iterator<Thread> iterator = waiters.iterator(); while (iterator.hasNext()) { Thread next = iterator.next(); LockSupport.unpark(next); // 唤醒 } } } public boolean tryReleaseShared() { throw new UnsupportedOperationException(); } public AtomicInteger getState() { return state; } public void setState(AtomicInteger state) { this.state = state; } //独占资源的逻辑 public boolean tryAcquire() { throw new UnsupportedOperationException(); } //定义了资源争用的逻辑 即等待逻辑 public void acquire(){ boolean addQ = true; while(!tryAcquire()){ if(addQ){ //没有拿到锁,等待,加入等待集合 waiters.offer(Thread.currentThread()); addQ = false; }else{ //阻塞 挂起挡墙线程,不要据需往下执行 LockSupport.park();//伪唤醒,非unpark调用唤醒 } } waiters.remove(Thread.currentThread()); } public boolean tryRelease() { throw new UnsupportedOperationException(); } public void release(){//定义了释放资源之后的操作 if (!tryRelease()){ //通知等待着 Iterator<Thread> iterator = waiters.iterator(); while(iterator.hasNext()){ Thread next = iterator.next(); LockSupport.unpark(next);//唤醒等待的线程 } } } }
public class DevonSemaphore { DevonAqs aqs = new DevonAqs(){ @Override public int tryAcquireShared() {//请求之后令牌数减一 state - 1 for (;;){ int count = getState().get(); //为了避免减成负数 int n = count -1; if (count <=0 || n <0){ return -1; } if (getState().compareAndSet(count,n)){ return 1; } } } @Override public boolean tryReleaseShared() {//释放之后令牌数加一 state +1 return getState().incrementAndGet() >= 0; } }; public DevonSemaphore(int count){ aqs.getState().set(count); } public void acquire(){//获得令牌 aqs.acquireShared(); } public void release(){//释放令牌 aqs.releaseShared(); } }
// 信号量机制 public class SemaphoreDemo { public static void main(String[] args) { SemaphoreDemo semaphoreTest = new SemaphoreDemo(); int N = 9; // 客人数量 DevonSemaphore semaphore = new DevonSemaphore(5); // 手牌数量,限制请求数量 for (int i = 0; i < N; i++) { String vipNo = "vip-00" + i; new Thread(() -> { try { semaphore.acquire(); // 获取令牌 semaphoreTest.service(vipNo); semaphore.release(); // 释放令牌 } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } // 限流 控制5个线程 同时访问 public void service(String vipNo) throws InterruptedException { System.out.println("楼上出来迎接贵宾一位,贵宾编号" + vipNo + ",..."); Thread.sleep(new Random().nextInt(3000)); System.out.println("欢送贵宾出门,贵宾编号" + vipNo); } } /* 楼上出来迎接贵宾一位,贵宾编号vip-000,... 楼上出来迎接贵宾一位,贵宾编号vip-001,... 楼上出来迎接贵宾一位,贵宾编号vip-002,... 楼上出来迎接贵宾一位,贵宾编号vip-003,... 楼上出来迎接贵宾一位,贵宾编号vip-004,... 欢送贵宾出门,贵宾编号vip-003 楼上出来迎接贵宾一位,贵宾编号vip-006,... 欢送贵宾出门,贵宾编号vip-001 楼上出来迎接贵宾一位,贵宾编号vip-005,... 欢送贵宾出门,贵宾编号vip-004 楼上出来迎接贵宾一位,贵宾编号vip-007,... 欢送贵宾出门,贵宾编号vip-000 楼上出来迎接贵宾一位,贵宾编号vip-008,... 欢送贵宾出门,贵宾编号vip-002 欢送贵宾出门,贵宾编号vip-007 欢送贵宾出门,贵宾编号vip-008 欢送贵宾出门,贵宾编号vip-006 欢送贵宾出门,贵宾编号vip-005 */
CountDownLatch(倒计数器)
创建对象时,传入指定数值作为线程参与的数量;
await:方法等待计数器变为0,在这之前,线程 进入等待状态;
countdown:计数器数值减一,直到为0;
经常用于等待其他线程执行到某一节点,在继续执行当前线程代码。
使用场景:
1、统计线程执行的状况
2、压力测试,实现最大程度的并发处理
3、多个线程之间,相互通信,比如线程异步调用完接口,结果通知
// CountDownLatch 自己实现 public class CDLdemo { DevonAqs aqSdemo = new DevonAqs() { @Override public int tryAcquireShared() { // 如果非等于0,代表当前还有线程没准备就绪,则认为需要等待 return this.getState().get() == 0 ? 1 : -1; } @Override public boolean tryReleaseShared() { // 如果非等于0,代表当前还有线程没准备就绪,则不会通知继续执行 return this.getState().decrementAndGet() == 0; } }; public CDLdemo(int count) { aqSdemo.setState(new AtomicInteger(count)); } public void await() { aqSdemo.acquireShared(); } public void countDown() { aqSdemo.releaseShared(); } public static void main(String[] args) throws InterruptedException { // 一个请求,后台需要调用多个接口 查询数据 CDLdemo cdLdemo = new CDLdemo(10); // 创建,计数数值 for (int i = 0; i < 10; i++) { // 启动九个线程,最后一个两秒后启动 int finalI = i; new Thread(() -> { try { Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("我是" + Thread.currentThread() + ".我执行接口-" + finalI +"调用了"); cdLdemo.countDown(); // 参与计数 // 不影响后续操作 }).start(); } cdLdemo.await(); // 等待计数器为0 System.out.println("全部执行完毕.我来召唤神龙"); } } /* 我是Thread[Thread-0,5,main].我执行接口-0调用了 我是Thread[Thread-1,5,main].我执行接口-1调用了 我是Thread[Thread-2,5,main].我执行接口-2调用了 我是Thread[Thread-3,5,main].我执行接口-3调用了 我是Thread[Thread-4,5,main].我执行接口-4调用了 我是Thread[Thread-5,5,main].我执行接口-5调用了 我是Thread[Thread-6,5,main].我执行接口-6调用了 我是Thread[Thread-7,5,main].我执行接口-7调用了 我是Thread[Thread-8,5,main].我执行接口-8调用了 我是Thread[Thread-9,5,main].我执行接口-9调用了 全部执行完毕.我来召唤神龙 */
CyclicBarrier(线程栅栏)
创建对象时,指定栅栏线程数量。
await:等待数量的线程都处于等待状态时,继续执行后续代码。
barrierAction:线程数量到了指定量之后,自动触发执行任务。
和CountDownLatch的区别是,CyclicBarrier对象可多次触发执行。
// 循环屏障(栅栏),示例:数据库批量插入 public class CyclicBarrierTest { public static void main(String[] args) throws InterruptedException { LinkedBlockingQueue<String> sqls = new LinkedBlockingQueue<>(); // 任务1+2+3...1000 拆分为100个任务(1+..10, 11+20) -> 100线程去处理。 // 每当有4个线程处于await状态的时候,则会触发barrierAction执行 CyclicBarrier barrier = new CyclicBarrier(4, new Runnable() { @Override public void run() { // 这是每满足4次数据库操作,就触发一次批量执行 System.out.println("有4个线程执行了,开始批量插入: " + Thread.currentThread()); for (int i = 0; i < 4; i++) { System.out.println(sqls.poll()); } } }); for (int i = 0; i < 10; i++) { new Thread(() -> { try { sqls.add("data - " + Thread.currentThread()); // 缓存起来 Thread.sleep(1000L); // 模拟数据库操作耗时 barrier.await(); // 等待栅栏打开,有4个线程都执行到这段代码的时候,才会继续往下执行 System.out.println(Thread.currentThread() + "插入完毕"); } catch (Exception e) { e.printStackTrace(); } }).start(); } Thread.sleep(2000); } } /* 有4个线程执行了,开始批量插入: Thread[Thread-4,5,main] data - Thread[Thread-0,5,main] data - Thread[Thread-1,5,main] data - Thread[Thread-2,5,main] data - Thread[Thread-3,5,main] Thread[Thread-4,5,main]插入完毕 Thread[Thread-2,5,main]插入完毕 Thread[Thread-3,5,main]插入完毕 Thread[Thread-0,5,main]插入完毕 有4个线程执行了,开始批量插入: Thread[Thread-7,5,main] data - Thread[Thread-4,5,main] data - Thread[Thread-5,5,main] data - Thread[Thread-6,5,main] data - Thread[Thread-7,5,main] Thread[Thread-7,5,main]插入完毕 Thread[Thread-6,5,main]插入完毕 Thread[Thread-5,5,main]插入完毕 Thread[Thread-1,5,main]插入完毕 */
以上部分内容代码摘自网易云课堂