zoukankan      html  css  js  c++  java
  • AQS抽象队列同步详解、信号量、倒计数器、栅栏

    同步锁本质

    同步的方式:独享锁 - 单个队列窗口,共享锁 - 多个队列窗口

    抢锁的方式:插队抢(不公平锁)、先来后到的抢锁(公平锁)

    没抢到锁的方式:快速尝试多次(CAS自选锁)、阻塞等待

    唤醒阻塞线程的方式(叫号器):全部通知、通知下一个

    //自己实现(独享锁)
    public class DevonLock implements Lock {
    
        //1.判断一个锁的状态或者说 拥有者
        volatile AtomicReference<Thread> owner = new AtomicReference<>();
    
        //保存正在等待的线程
        volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
    
        @Override
        public void lock() {
            boolean addQ = true;
            while(!tryLock()){
                if(addQ){
                    //没有拿到锁,等待,加入等待集合
                    waiters.offer(Thread.currentThread());
                    addQ = false;
                }else{
                    //阻塞 挂起挡墙线程,不要据需往下执行
                    LockSupport.park();//伪唤醒,非unpark调用唤醒
                }
            }
            waiters.remove(Thread.currentThread());
        }
    
        @Override
        public void unlock() {
            //释放拥有者
            if (owner.compareAndSet(Thread.currentThread(),null)) {//释放成功
                //通知等待着
                Iterator<Thread> iterator = waiters.iterator();
                while(iterator.hasNext()){
                    Thread next = iterator.next();
                    LockSupport.unpark(next);//唤醒等待的线程
                }
            }
        }
    
        //尝试去获取一次锁
        @Override
        public boolean tryLock() {
            return owner.compareAndSet(null,Thread.currentThread());
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
    
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
        @Override
        public Condition newCondition() {
            return null;
        }
    }
    自己实现读写锁
    public class LockDemo3 {
        volatile int i = 0;
    
        Lock lock = new DevonLock();
    
        public void add() {
            lock.lock();
            try {
                // TODO  很多业务操作
                i++;
            }finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            LockDemo3 ld = new LockDemo3();
    
            for (int i = 0; i < 2; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 10000; j++) {
                        ld.add();
                    }
                }).start();
            }
            Thread.sleep(2000L);
            System.out.println(ld.i);
        }
    }
    验证代码

     AQS抽象队列同步器

    提供了对资源占用、释放,线程的等待、唤醒等等接口和具体实现,即,将线程释放,等待等线程方法抽象出来封装

    方法定义:

     acquire、 acquireShared : 定义了资源争用的逻辑,如果没拿到,则等待。
    tryAcquire、 tryAcquireShared : 实际执行占用资源的操作,如何判定一个由使用者具体去实现。
    release、 releaseShared : 定义释放资源的逻辑,释放之后,通知后续节点进行争抢。
    tryRelease、 tryReleaseShared: 实际执行资源释放的操作,具体的AQS使用者去实现。

    自己实现独享锁

    public class DevonAqs {
        // acquire、 acquireShared : 定义了资源争用的逻辑,如果没拿到,则等待。
        // tryAcquire、 tryAcquireShared : 实际执行占用资源的操作,如何判定一个由使用者具体去实现。
        // release、 releaseShared : 定义释放资源的逻辑,释放之后,通知后续节点进行争抢。
        // tryRelease、 tryReleaseShared: 实际执行资源释放的操作,具体的AQS使用者去实现。
    
        // 1、 如何判断一个资源的拥有者
        public volatile AtomicReference<Thread> owner = new AtomicReference<>();
        // 保存 正在等待的线程
        public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
    
    
        public boolean tryAcquire() {
            throw new UnsupportedOperationException();
        }
    
        //定义了资源争用的逻辑 即等待逻辑
        public void acquire(){
            boolean addQ = true;
            while(!tryAcquire()){
                if(addQ){
                    //没有拿到锁,等待,加入等待集合
                    waiters.offer(Thread.currentThread());
                    addQ = false;
                }else{
                    //阻塞 挂起挡墙线程,不要据需往下执行
                    LockSupport.park();//伪唤醒,非unpark调用唤醒
                }
            }
            waiters.remove(Thread.currentThread());
        }
    
        public boolean tryRelease() {
            throw new UnsupportedOperationException();
        }
    
        public void release(){//定义了释放资源之后的操作
            if (!tryRelease()){
                //通知等待着
                Iterator<Thread> iterator = waiters.iterator();
                while(iterator.hasNext()){
                    Thread next = iterator.next();
                    LockSupport.unpark(next);//唤醒等待的线程
                }
            }
        }
    
    
    }
    AQS自己实现
    //自己实现(独享锁)
    public class DevonLock implements Lock {
    
        DevonAqs aqs = new DevonAqs(){
            @Override
            public boolean tryAcquire() {
                return owner.compareAndSet(null,Thread.currentThread());
            }
    
            @Override
            public boolean tryRelease() {
                return owner.compareAndSet(Thread.currentThread(),null);
            }
        };
    
        //尝试去获取一次锁
        @Override
        public boolean tryLock() {
            return aqs.tryAcquire();
        };
    
    
        @Override
        public void lock() {
            aqs.acquire();
        }
    
        @Override
        public void unlock() {
            aqs.release();
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
    
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
        @Override
        public Condition newCondition() {
            return null;
        }
    }
    使用AQS实现独享锁
    public class LockDemo3 {
        volatile int i = 0;
    
        Lock lock = new DevonLock();
    
        public void add() {
            lock.lock();
            try {
                // TODO  很多业务操作
                i++;
            }finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            LockDemo3 ld = new LockDemo3();
    
            for (int i = 0; i < 3; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 10000; j++) {
                        ld.add();
                    }
                }).start();
            }
            Thread.sleep(2000L);
            System.out.println(ld.i);
        }
    }
    测试代码

    Semaphore

    信号量,控制多个线程争抢许可。

    acquire:获取一个许可,如果没有就等待

    release:释放一个许可。

    availablePermites:方法得到可用的许可项目

    // 信号量机制
    public class SemaphoreDemo {
        public static void main(String[] args) {
            SemaphoreDemo semaphoreTest = new SemaphoreDemo();
            int N = 9;            // 客人数量
            Semaphore semaphore = new Semaphore(5); // 手牌数量,限制请求数量
            for (int i = 0; i < N; i++) {
                String vipNo = "vip-00" + i;
                new Thread(() -> {
                    try {
                        semaphore.acquire(); // 获取令牌
    
                        semaphoreTest.service(vipNo);
    
                        semaphore.release(); // 释放令牌
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    
        // 限流 控制5个线程 同时访问
        public void service(String vipNo) throws InterruptedException {
            System.out.println("楼上出来迎接贵宾一位,贵宾编号" + vipNo + ",...");
            Thread.sleep(new Random().nextInt(3000));
            System.out.println("欢送贵宾出门,贵宾编号" + vipNo);
        }
    
    }
    信号量的使用

    自己实现信号量

    public class DevonAqs {
        // acquire、 acquireShared : 定义了资源争用的逻辑,如果没拿到,则等待。
        // tryAcquire、 tryAcquireShared : 实际执行占用资源的操作,如何判定一个由使用者具体去实现。
        // release、 releaseShared : 定义释放资源的逻辑,释放之后,通知后续节点进行争抢。
        // tryRelease、 tryReleaseShared: 实际执行资源释放的操作,具体的AQS使用者去实现。
    
        // 1、 如何判断一个资源的拥有者
        public volatile AtomicReference<Thread> owner = new AtomicReference<>();
        // 保存 正在等待的线程
        public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
        //记录资源状态
        public volatile AtomicInteger state = new AtomicInteger(0);
    
    
    
        //共享资源占用的逻辑,返回资源占用的逻辑
        public int tryAcquireShared(){
            throw new UnsupportedOperationException();
        }
    
        public void acquireShared(){
            boolean addQ = true;
            while (tryAcquireShared()<0){
                if(addQ){
                    //没有拿到锁,等待,加入等待集合
                    waiters.offer(Thread.currentThread());
                    addQ = false;
                }else{
                    //阻塞 挂起挡墙线程,不要据需往下执行
                    LockSupport.park();//伪唤醒,非unpark调用唤醒
                }
            }
            waiters.remove(Thread.currentThread());
        }
    
        public void releaseShared(){
            if (tryReleaseShared()){
                //通知等待着
                Iterator<Thread> iterator = waiters.iterator();
                while (iterator.hasNext()) {
                    Thread next = iterator.next();
                    LockSupport.unpark(next); // 唤醒
                }
            }
        }
    
        public boolean tryReleaseShared() {
            throw new UnsupportedOperationException();
        }
    
    
        public AtomicInteger getState() {
            return state;
        }
    
        public void setState(AtomicInteger state) {
            this.state = state;
        }
    
        //独占资源的逻辑
    
    
        public boolean tryAcquire() {
            throw new UnsupportedOperationException();
        }
        //定义了资源争用的逻辑 即等待逻辑
    
        public void acquire(){
            boolean addQ = true;
            while(!tryAcquire()){
                if(addQ){
                    //没有拿到锁,等待,加入等待集合
                    waiters.offer(Thread.currentThread());
                    addQ = false;
                }else{
                    //阻塞 挂起挡墙线程,不要据需往下执行
                    LockSupport.park();//伪唤醒,非unpark调用唤醒
                }
            }
            waiters.remove(Thread.currentThread());
        }
    
        public boolean tryRelease() {
            throw new UnsupportedOperationException();
        }
    
        public void release(){//定义了释放资源之后的操作
            if (!tryRelease()){
                //通知等待着
                Iterator<Thread> iterator = waiters.iterator();
                while(iterator.hasNext()){
                    Thread next = iterator.next();
                    LockSupport.unpark(next);//唤醒等待的线程
                }
            }
        }
    
    }
    AQS自己实现
    public class DevonSemaphore {
    
        DevonAqs aqs = new DevonAqs(){
            @Override
            public int tryAcquireShared() {//请求之后令牌数减一 state - 1
                for (;;){
                    int count = getState().get(); //为了避免减成负数
                    int n = count -1;
                    if (count <=0 || n <0){
                        return -1;
                    }
    
                    if (getState().compareAndSet(count,n)){
                        return 1;
                    }
                }
    
            }
    
            @Override
            public boolean tryReleaseShared() {//释放之后令牌数加一 state +1
                return getState().incrementAndGet() >= 0;
            }
        };
    
        public DevonSemaphore(int count){
            aqs.getState().set(count);
        }
    
        public void acquire(){//获得令牌
            aqs.acquireShared();
        }
    
        public void release(){//释放令牌
            aqs.releaseShared();
        }
    }
    使用AQS实现信号量
    // 信号量机制
    public class SemaphoreDemo {
        public static void main(String[] args) {
            SemaphoreDemo semaphoreTest = new SemaphoreDemo();
            int N = 9;            // 客人数量
            DevonSemaphore semaphore = new DevonSemaphore(5); // 手牌数量,限制请求数量
            for (int i = 0; i < N; i++) {
                String vipNo = "vip-00" + i;
                new Thread(() -> {
                    try {
                        semaphore.acquire(); // 获取令牌
    
                        semaphoreTest.service(vipNo);
    
                        semaphore.release(); // 释放令牌
    
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    
        // 限流 控制5个线程 同时访问
        public void service(String vipNo) throws InterruptedException {
            System.out.println("楼上出来迎接贵宾一位,贵宾编号" + vipNo + ",...");
            Thread.sleep(new Random().nextInt(3000));
            System.out.println("欢送贵宾出门,贵宾编号" + vipNo);
        }
    
    }
    /*
    楼上出来迎接贵宾一位,贵宾编号vip-000,...
    楼上出来迎接贵宾一位,贵宾编号vip-001,...
    楼上出来迎接贵宾一位,贵宾编号vip-002,...
    楼上出来迎接贵宾一位,贵宾编号vip-003,...
    楼上出来迎接贵宾一位,贵宾编号vip-004,...
    欢送贵宾出门,贵宾编号vip-003
    楼上出来迎接贵宾一位,贵宾编号vip-006,...
    欢送贵宾出门,贵宾编号vip-001
    楼上出来迎接贵宾一位,贵宾编号vip-005,...
    欢送贵宾出门,贵宾编号vip-004
    楼上出来迎接贵宾一位,贵宾编号vip-007,...
    欢送贵宾出门,贵宾编号vip-000
    楼上出来迎接贵宾一位,贵宾编号vip-008,...
    欢送贵宾出门,贵宾编号vip-002
    欢送贵宾出门,贵宾编号vip-007
    欢送贵宾出门,贵宾编号vip-008
    欢送贵宾出门,贵宾编号vip-006
    欢送贵宾出门,贵宾编号vip-005
     */
    测试代码

    CountDownLatch(倒计数器)

    创建对象时,传入指定数值作为线程参与的数量;

    await:方法等待计数器变为0,在这之前,线程 进入等待状态;

    countdown:计数器数值减一,直到为0;

    经常用于等待其他线程执行到某一节点,在继续执行当前线程代码。

    使用场景:

    1、统计线程执行的状况

    2、压力测试,实现最大程度的并发处理

    3、多个线程之间,相互通信,比如线程异步调用完接口,结果通知

    // CountDownLatch 自己实现
    public class CDLdemo {
        DevonAqs aqSdemo = new DevonAqs() {
            @Override
            public int tryAcquireShared() { // 如果非等于0,代表当前还有线程没准备就绪,则认为需要等待
                return this.getState().get() == 0 ? 1 : -1;
            }
    
            @Override
            public boolean tryReleaseShared() { // 如果非等于0,代表当前还有线程没准备就绪,则不会通知继续执行
                return this.getState().decrementAndGet() == 0;
            }
        };
    
        public CDLdemo(int count) {
            aqSdemo.setState(new AtomicInteger(count));
        }
    
        public void await() {
            aqSdemo.acquireShared();
        }
    
        public void countDown() {
            aqSdemo.releaseShared();
        }
    
    
        public static void main(String[] args) throws InterruptedException {
            // 一个请求,后台需要调用多个接口 查询数据
            CDLdemo cdLdemo = new CDLdemo(10); // 创建,计数数值
            for (int i = 0; i < 10; i++) { // 启动九个线程,最后一个两秒后启动
                int finalI = i;
                new Thread(() -> {
                    try {
                        Thread.sleep(2000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("我是" + Thread.currentThread() + ".我执行接口-" + finalI +"调用了");
                    cdLdemo.countDown(); // 参与计数
                    // 不影响后续操作
                }).start();
            }
    
            cdLdemo.await(); // 等待计数器为0
            System.out.println("全部执行完毕.我来召唤神龙");
    
        }
    }
    /*
    我是Thread[Thread-0,5,main].我执行接口-0调用了
    我是Thread[Thread-1,5,main].我执行接口-1调用了
    我是Thread[Thread-2,5,main].我执行接口-2调用了
    我是Thread[Thread-3,5,main].我执行接口-3调用了
    我是Thread[Thread-4,5,main].我执行接口-4调用了
    我是Thread[Thread-5,5,main].我执行接口-5调用了
    我是Thread[Thread-6,5,main].我执行接口-6调用了
    我是Thread[Thread-7,5,main].我执行接口-7调用了
    我是Thread[Thread-8,5,main].我执行接口-8调用了
    我是Thread[Thread-9,5,main].我执行接口-9调用了
    全部执行完毕.我来召唤神龙
     */

    CyclicBarrier(线程栅栏)

    创建对象时,指定栅栏线程数量。

    await:等待数量的线程都处于等待状态时,继续执行后续代码。

    barrierAction:线程数量到了指定量之后,自动触发执行任务。

    和CountDownLatch的区别是,CyclicBarrier对象可多次触发执行。

    // 循环屏障(栅栏),示例:数据库批量插入
    public class CyclicBarrierTest {
        public static void main(String[] args) throws InterruptedException {
            LinkedBlockingQueue<String> sqls = new LinkedBlockingQueue<>();
            // 任务1+2+3...1000  拆分为100个任务(1+..10,  11+20) -> 100线程去处理。
    
            // 每当有4个线程处于await状态的时候,则会触发barrierAction执行
            CyclicBarrier barrier = new CyclicBarrier(4, new Runnable() {
                @Override
                public void run() {
                    // 这是每满足4次数据库操作,就触发一次批量执行
                    System.out.println("有4个线程执行了,开始批量插入: " + Thread.currentThread());
                    for (int i = 0; i < 4; i++) {
                        System.out.println(sqls.poll());
                    }
                }
            });
    
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    try {
                        sqls.add("data - " + Thread.currentThread()); // 缓存起来
                        Thread.sleep(1000L); // 模拟数据库操作耗时
                        barrier.await(); // 等待栅栏打开,有4个线程都执行到这段代码的时候,才会继续往下执行
                        System.out.println(Thread.currentThread() + "插入完毕");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }).start();
            }
    
            Thread.sleep(2000);
        }
    }
    /*
    有4个线程执行了,开始批量插入: Thread[Thread-4,5,main]
    data - Thread[Thread-0,5,main]
    data - Thread[Thread-1,5,main]
    data - Thread[Thread-2,5,main]
    data - Thread[Thread-3,5,main]
    Thread[Thread-4,5,main]插入完毕
    Thread[Thread-2,5,main]插入完毕
    Thread[Thread-3,5,main]插入完毕
    Thread[Thread-0,5,main]插入完毕
    有4个线程执行了,开始批量插入: Thread[Thread-7,5,main]
    data - Thread[Thread-4,5,main]
    data - Thread[Thread-5,5,main]
    data - Thread[Thread-6,5,main]
    data - Thread[Thread-7,5,main]
    Thread[Thread-7,5,main]插入完毕
    Thread[Thread-6,5,main]插入完毕
    Thread[Thread-5,5,main]插入完毕
    Thread[Thread-1,5,main]插入完毕
     */
    代码示例

      

    以上部分内容代码摘自网易云课堂

  • 相关阅读:
    Codeforces Round #344 (Div. 2) C. Report 其他
    Codeforces Round #344 (Div. 2) B. Print Check 水题
    Codeforces Round #344 (Div. 2) A. Interview 水题
    8VC Venture Cup 2016
    CDOJ 1280 772002画马尾 每周一题 div1 矩阵快速幂 中二版
    CDOJ 1280 772002画马尾 每周一题 div1 矩阵快速幂
    CDOJ 1279 班委选举 每周一题 div2 暴力
    每周算法讲堂 快速幂
    8VC Venture Cup 2016
    Educational Codeforces Round 9 F. Magic Matrix 最小生成树
  • 原文地址:https://www.cnblogs.com/shuzhixia/p/13361003.html
Copyright © 2011-2022 走看看