zoukankan      html  css  js  c++  java
  • AQS抽象队列同步详解、信号量、倒计数器、栅栏

    同步锁本质

    同步的方式:独享锁 - 单个队列窗口,共享锁 - 多个队列窗口

    抢锁的方式:插队抢(不公平锁)、先来后到的抢锁(公平锁)

    没抢到锁的方式:快速尝试多次(CAS自选锁)、阻塞等待

    唤醒阻塞线程的方式(叫号器):全部通知、通知下一个

    //自己实现(独享锁)
    public class DevonLock implements Lock {
    
        //1.判断一个锁的状态或者说 拥有者
        volatile AtomicReference<Thread> owner = new AtomicReference<>();
    
        //保存正在等待的线程
        volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
    
        @Override
        public void lock() {
            boolean addQ = true;
            while(!tryLock()){
                if(addQ){
                    //没有拿到锁,等待,加入等待集合
                    waiters.offer(Thread.currentThread());
                    addQ = false;
                }else{
                    //阻塞 挂起挡墙线程,不要据需往下执行
                    LockSupport.park();//伪唤醒,非unpark调用唤醒
                }
            }
            waiters.remove(Thread.currentThread());
        }
    
        @Override
        public void unlock() {
            //释放拥有者
            if (owner.compareAndSet(Thread.currentThread(),null)) {//释放成功
                //通知等待着
                Iterator<Thread> iterator = waiters.iterator();
                while(iterator.hasNext()){
                    Thread next = iterator.next();
                    LockSupport.unpark(next);//唤醒等待的线程
                }
            }
        }
    
        //尝试去获取一次锁
        @Override
        public boolean tryLock() {
            return owner.compareAndSet(null,Thread.currentThread());
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
    
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
        @Override
        public Condition newCondition() {
            return null;
        }
    }
    自己实现读写锁
    public class LockDemo3 {
        volatile int i = 0;
    
        Lock lock = new DevonLock();
    
        public void add() {
            lock.lock();
            try {
                // TODO  很多业务操作
                i++;
            }finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            LockDemo3 ld = new LockDemo3();
    
            for (int i = 0; i < 2; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 10000; j++) {
                        ld.add();
                    }
                }).start();
            }
            Thread.sleep(2000L);
            System.out.println(ld.i);
        }
    }
    验证代码

     AQS抽象队列同步器

    提供了对资源占用、释放,线程的等待、唤醒等等接口和具体实现,即,将线程释放,等待等线程方法抽象出来封装

    方法定义:

     acquire、 acquireShared : 定义了资源争用的逻辑,如果没拿到,则等待。
    tryAcquire、 tryAcquireShared : 实际执行占用资源的操作,如何判定一个由使用者具体去实现。
    release、 releaseShared : 定义释放资源的逻辑,释放之后,通知后续节点进行争抢。
    tryRelease、 tryReleaseShared: 实际执行资源释放的操作,具体的AQS使用者去实现。

    自己实现独享锁

    public class DevonAqs {
        // acquire、 acquireShared : 定义了资源争用的逻辑,如果没拿到,则等待。
        // tryAcquire、 tryAcquireShared : 实际执行占用资源的操作,如何判定一个由使用者具体去实现。
        // release、 releaseShared : 定义释放资源的逻辑,释放之后,通知后续节点进行争抢。
        // tryRelease、 tryReleaseShared: 实际执行资源释放的操作,具体的AQS使用者去实现。
    
        // 1、 如何判断一个资源的拥有者
        public volatile AtomicReference<Thread> owner = new AtomicReference<>();
        // 保存 正在等待的线程
        public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
    
    
        public boolean tryAcquire() {
            throw new UnsupportedOperationException();
        }
    
        //定义了资源争用的逻辑 即等待逻辑
        public void acquire(){
            boolean addQ = true;
            while(!tryAcquire()){
                if(addQ){
                    //没有拿到锁,等待,加入等待集合
                    waiters.offer(Thread.currentThread());
                    addQ = false;
                }else{
                    //阻塞 挂起挡墙线程,不要据需往下执行
                    LockSupport.park();//伪唤醒,非unpark调用唤醒
                }
            }
            waiters.remove(Thread.currentThread());
        }
    
        public boolean tryRelease() {
            throw new UnsupportedOperationException();
        }
    
        public void release(){//定义了释放资源之后的操作
            if (!tryRelease()){
                //通知等待着
                Iterator<Thread> iterator = waiters.iterator();
                while(iterator.hasNext()){
                    Thread next = iterator.next();
                    LockSupport.unpark(next);//唤醒等待的线程
                }
            }
        }
    
    
    }
    AQS自己实现
    //自己实现(独享锁)
    public class DevonLock implements Lock {
    
        DevonAqs aqs = new DevonAqs(){
            @Override
            public boolean tryAcquire() {
                return owner.compareAndSet(null,Thread.currentThread());
            }
    
            @Override
            public boolean tryRelease() {
                return owner.compareAndSet(Thread.currentThread(),null);
            }
        };
    
        //尝试去获取一次锁
        @Override
        public boolean tryLock() {
            return aqs.tryAcquire();
        };
    
    
        @Override
        public void lock() {
            aqs.acquire();
        }
    
        @Override
        public void unlock() {
            aqs.release();
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
    
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
        @Override
        public Condition newCondition() {
            return null;
        }
    }
    使用AQS实现独享锁
    public class LockDemo3 {
        volatile int i = 0;
    
        Lock lock = new DevonLock();
    
        public void add() {
            lock.lock();
            try {
                // TODO  很多业务操作
                i++;
            }finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            LockDemo3 ld = new LockDemo3();
    
            for (int i = 0; i < 3; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 10000; j++) {
                        ld.add();
                    }
                }).start();
            }
            Thread.sleep(2000L);
            System.out.println(ld.i);
        }
    }
    测试代码

    Semaphore

    信号量,控制多个线程争抢许可。

    acquire:获取一个许可,如果没有就等待

    release:释放一个许可。

    availablePermites:方法得到可用的许可项目

    // 信号量机制
    public class SemaphoreDemo {
        public static void main(String[] args) {
            SemaphoreDemo semaphoreTest = new SemaphoreDemo();
            int N = 9;            // 客人数量
            Semaphore semaphore = new Semaphore(5); // 手牌数量,限制请求数量
            for (int i = 0; i < N; i++) {
                String vipNo = "vip-00" + i;
                new Thread(() -> {
                    try {
                        semaphore.acquire(); // 获取令牌
    
                        semaphoreTest.service(vipNo);
    
                        semaphore.release(); // 释放令牌
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    
        // 限流 控制5个线程 同时访问
        public void service(String vipNo) throws InterruptedException {
            System.out.println("楼上出来迎接贵宾一位,贵宾编号" + vipNo + ",...");
            Thread.sleep(new Random().nextInt(3000));
            System.out.println("欢送贵宾出门,贵宾编号" + vipNo);
        }
    
    }
    信号量的使用

    自己实现信号量

    public class DevonAqs {
        // acquire、 acquireShared : 定义了资源争用的逻辑,如果没拿到,则等待。
        // tryAcquire、 tryAcquireShared : 实际执行占用资源的操作,如何判定一个由使用者具体去实现。
        // release、 releaseShared : 定义释放资源的逻辑,释放之后,通知后续节点进行争抢。
        // tryRelease、 tryReleaseShared: 实际执行资源释放的操作,具体的AQS使用者去实现。
    
        // 1、 如何判断一个资源的拥有者
        public volatile AtomicReference<Thread> owner = new AtomicReference<>();
        // 保存 正在等待的线程
        public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
        //记录资源状态
        public volatile AtomicInteger state = new AtomicInteger(0);
    
    
    
        //共享资源占用的逻辑,返回资源占用的逻辑
        public int tryAcquireShared(){
            throw new UnsupportedOperationException();
        }
    
        public void acquireShared(){
            boolean addQ = true;
            while (tryAcquireShared()<0){
                if(addQ){
                    //没有拿到锁,等待,加入等待集合
                    waiters.offer(Thread.currentThread());
                    addQ = false;
                }else{
                    //阻塞 挂起挡墙线程,不要据需往下执行
                    LockSupport.park();//伪唤醒,非unpark调用唤醒
                }
            }
            waiters.remove(Thread.currentThread());
        }
    
        public void releaseShared(){
            if (tryReleaseShared()){
                //通知等待着
                Iterator<Thread> iterator = waiters.iterator();
                while (iterator.hasNext()) {
                    Thread next = iterator.next();
                    LockSupport.unpark(next); // 唤醒
                }
            }
        }
    
        public boolean tryReleaseShared() {
            throw new UnsupportedOperationException();
        }
    
    
        public AtomicInteger getState() {
            return state;
        }
    
        public void setState(AtomicInteger state) {
            this.state = state;
        }
    
        //独占资源的逻辑
    
    
        public boolean tryAcquire() {
            throw new UnsupportedOperationException();
        }
        //定义了资源争用的逻辑 即等待逻辑
    
        public void acquire(){
            boolean addQ = true;
            while(!tryAcquire()){
                if(addQ){
                    //没有拿到锁,等待,加入等待集合
                    waiters.offer(Thread.currentThread());
                    addQ = false;
                }else{
                    //阻塞 挂起挡墙线程,不要据需往下执行
                    LockSupport.park();//伪唤醒,非unpark调用唤醒
                }
            }
            waiters.remove(Thread.currentThread());
        }
    
        public boolean tryRelease() {
            throw new UnsupportedOperationException();
        }
    
        public void release(){//定义了释放资源之后的操作
            if (!tryRelease()){
                //通知等待着
                Iterator<Thread> iterator = waiters.iterator();
                while(iterator.hasNext()){
                    Thread next = iterator.next();
                    LockSupport.unpark(next);//唤醒等待的线程
                }
            }
        }
    
    }
    AQS自己实现
    public class DevonSemaphore {
    
        DevonAqs aqs = new DevonAqs(){
            @Override
            public int tryAcquireShared() {//请求之后令牌数减一 state - 1
                for (;;){
                    int count = getState().get(); //为了避免减成负数
                    int n = count -1;
                    if (count <=0 || n <0){
                        return -1;
                    }
    
                    if (getState().compareAndSet(count,n)){
                        return 1;
                    }
                }
    
            }
    
            @Override
            public boolean tryReleaseShared() {//释放之后令牌数加一 state +1
                return getState().incrementAndGet() >= 0;
            }
        };
    
        public DevonSemaphore(int count){
            aqs.getState().set(count);
        }
    
        public void acquire(){//获得令牌
            aqs.acquireShared();
        }
    
        public void release(){//释放令牌
            aqs.releaseShared();
        }
    }
    使用AQS实现信号量
    // 信号量机制
    public class SemaphoreDemo {
        public static void main(String[] args) {
            SemaphoreDemo semaphoreTest = new SemaphoreDemo();
            int N = 9;            // 客人数量
            DevonSemaphore semaphore = new DevonSemaphore(5); // 手牌数量,限制请求数量
            for (int i = 0; i < N; i++) {
                String vipNo = "vip-00" + i;
                new Thread(() -> {
                    try {
                        semaphore.acquire(); // 获取令牌
    
                        semaphoreTest.service(vipNo);
    
                        semaphore.release(); // 释放令牌
    
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    
        // 限流 控制5个线程 同时访问
        public void service(String vipNo) throws InterruptedException {
            System.out.println("楼上出来迎接贵宾一位,贵宾编号" + vipNo + ",...");
            Thread.sleep(new Random().nextInt(3000));
            System.out.println("欢送贵宾出门,贵宾编号" + vipNo);
        }
    
    }
    /*
    楼上出来迎接贵宾一位,贵宾编号vip-000,...
    楼上出来迎接贵宾一位,贵宾编号vip-001,...
    楼上出来迎接贵宾一位,贵宾编号vip-002,...
    楼上出来迎接贵宾一位,贵宾编号vip-003,...
    楼上出来迎接贵宾一位,贵宾编号vip-004,...
    欢送贵宾出门,贵宾编号vip-003
    楼上出来迎接贵宾一位,贵宾编号vip-006,...
    欢送贵宾出门,贵宾编号vip-001
    楼上出来迎接贵宾一位,贵宾编号vip-005,...
    欢送贵宾出门,贵宾编号vip-004
    楼上出来迎接贵宾一位,贵宾编号vip-007,...
    欢送贵宾出门,贵宾编号vip-000
    楼上出来迎接贵宾一位,贵宾编号vip-008,...
    欢送贵宾出门,贵宾编号vip-002
    欢送贵宾出门,贵宾编号vip-007
    欢送贵宾出门,贵宾编号vip-008
    欢送贵宾出门,贵宾编号vip-006
    欢送贵宾出门,贵宾编号vip-005
     */
    测试代码

    CountDownLatch(倒计数器)

    创建对象时,传入指定数值作为线程参与的数量;

    await:方法等待计数器变为0,在这之前,线程 进入等待状态;

    countdown:计数器数值减一,直到为0;

    经常用于等待其他线程执行到某一节点,在继续执行当前线程代码。

    使用场景:

    1、统计线程执行的状况

    2、压力测试,实现最大程度的并发处理

    3、多个线程之间,相互通信,比如线程异步调用完接口,结果通知

    // CountDownLatch 自己实现
    public class CDLdemo {
        DevonAqs aqSdemo = new DevonAqs() {
            @Override
            public int tryAcquireShared() { // 如果非等于0,代表当前还有线程没准备就绪,则认为需要等待
                return this.getState().get() == 0 ? 1 : -1;
            }
    
            @Override
            public boolean tryReleaseShared() { // 如果非等于0,代表当前还有线程没准备就绪,则不会通知继续执行
                return this.getState().decrementAndGet() == 0;
            }
        };
    
        public CDLdemo(int count) {
            aqSdemo.setState(new AtomicInteger(count));
        }
    
        public void await() {
            aqSdemo.acquireShared();
        }
    
        public void countDown() {
            aqSdemo.releaseShared();
        }
    
    
        public static void main(String[] args) throws InterruptedException {
            // 一个请求,后台需要调用多个接口 查询数据
            CDLdemo cdLdemo = new CDLdemo(10); // 创建,计数数值
            for (int i = 0; i < 10; i++) { // 启动九个线程,最后一个两秒后启动
                int finalI = i;
                new Thread(() -> {
                    try {
                        Thread.sleep(2000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("我是" + Thread.currentThread() + ".我执行接口-" + finalI +"调用了");
                    cdLdemo.countDown(); // 参与计数
                    // 不影响后续操作
                }).start();
            }
    
            cdLdemo.await(); // 等待计数器为0
            System.out.println("全部执行完毕.我来召唤神龙");
    
        }
    }
    /*
    我是Thread[Thread-0,5,main].我执行接口-0调用了
    我是Thread[Thread-1,5,main].我执行接口-1调用了
    我是Thread[Thread-2,5,main].我执行接口-2调用了
    我是Thread[Thread-3,5,main].我执行接口-3调用了
    我是Thread[Thread-4,5,main].我执行接口-4调用了
    我是Thread[Thread-5,5,main].我执行接口-5调用了
    我是Thread[Thread-6,5,main].我执行接口-6调用了
    我是Thread[Thread-7,5,main].我执行接口-7调用了
    我是Thread[Thread-8,5,main].我执行接口-8调用了
    我是Thread[Thread-9,5,main].我执行接口-9调用了
    全部执行完毕.我来召唤神龙
     */

    CyclicBarrier(线程栅栏)

    创建对象时,指定栅栏线程数量。

    await:等待数量的线程都处于等待状态时,继续执行后续代码。

    barrierAction:线程数量到了指定量之后,自动触发执行任务。

    和CountDownLatch的区别是,CyclicBarrier对象可多次触发执行。

    // 循环屏障(栅栏),示例:数据库批量插入
    public class CyclicBarrierTest {
        public static void main(String[] args) throws InterruptedException {
            LinkedBlockingQueue<String> sqls = new LinkedBlockingQueue<>();
            // 任务1+2+3...1000  拆分为100个任务(1+..10,  11+20) -> 100线程去处理。
    
            // 每当有4个线程处于await状态的时候,则会触发barrierAction执行
            CyclicBarrier barrier = new CyclicBarrier(4, new Runnable() {
                @Override
                public void run() {
                    // 这是每满足4次数据库操作,就触发一次批量执行
                    System.out.println("有4个线程执行了,开始批量插入: " + Thread.currentThread());
                    for (int i = 0; i < 4; i++) {
                        System.out.println(sqls.poll());
                    }
                }
            });
    
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    try {
                        sqls.add("data - " + Thread.currentThread()); // 缓存起来
                        Thread.sleep(1000L); // 模拟数据库操作耗时
                        barrier.await(); // 等待栅栏打开,有4个线程都执行到这段代码的时候,才会继续往下执行
                        System.out.println(Thread.currentThread() + "插入完毕");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }).start();
            }
    
            Thread.sleep(2000);
        }
    }
    /*
    有4个线程执行了,开始批量插入: Thread[Thread-4,5,main]
    data - Thread[Thread-0,5,main]
    data - Thread[Thread-1,5,main]
    data - Thread[Thread-2,5,main]
    data - Thread[Thread-3,5,main]
    Thread[Thread-4,5,main]插入完毕
    Thread[Thread-2,5,main]插入完毕
    Thread[Thread-3,5,main]插入完毕
    Thread[Thread-0,5,main]插入完毕
    有4个线程执行了,开始批量插入: Thread[Thread-7,5,main]
    data - Thread[Thread-4,5,main]
    data - Thread[Thread-5,5,main]
    data - Thread[Thread-6,5,main]
    data - Thread[Thread-7,5,main]
    Thread[Thread-7,5,main]插入完毕
    Thread[Thread-6,5,main]插入完毕
    Thread[Thread-5,5,main]插入完毕
    Thread[Thread-1,5,main]插入完毕
     */
    代码示例

      

    以上部分内容代码摘自网易云课堂

  • 相关阅读:
    深入理解 CSS3 弹性盒布局模型
    【css基础】文本对齐,水平对齐,垂直对齐
    使用百度地图API进行坐标系转换
    dtgrid 手动条件删除表格中的某一行
    短时间生成大量不重复随机数字
    Android与.Net交互模拟用户屏幕操作添加APN和网络4G/3G切换
    (三)SpringBoot定义统一返回result对象
    (二)SpringBoot整合常用框架Druid连接池
    (一)搭建自己的SpringBoot后台框架整合MyBatis
    插件-鼠标或手指滑动事件
  • 原文地址:https://www.cnblogs.com/shuzhixia/p/13361003.html
Copyright © 2011-2022 走看看