zoukankan      html  css  js  c++  java
  • 使用递增计数器的线程同步工具 —— 信号量,它的原理是什么样子的?

    前言

    在 JUC 中线程同步器除了 CountDownLatch 和 CycleBarrier ,还有一个叫做 Semaphore (信号量),同样是基于 AQS 实现的。下面来看看信号量的内部原理。

    介绍

    一个计数信号量。 从概念上讲,信号量维护了一组许可。 如果有必要,在许可可用之前调用 acquire 方法会被阻塞,直到许可证可用。 调用 release 方法会增加了一个许可证,从而释放被阻塞的线程。

    1. 声明时指定初始许可数量。
    2. 调用 acquire(int permits) 方法,指定目标许可数量。
    3. 调用 release(int permits) 方法,发布指定的许可数量。

    在许可数量没有到达指定目标数量时,调用 acquire 方法的线程会被阻塞。

    基本使用

    public class SemaphoreTest1 {
    
        private static final Semaphore SEMAPHORE = new Semaphore(0);
    
        public static void main(String[] args) throws InterruptedException {
    
            ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(1024),
                    new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
                    new ThreadPoolExecutor.AbortPolicy());
    
    
            for (int i = 0; i < 5; i++) {
    
                pool.submit(() -> {
    
                    try {
                        Thread.sleep(1000 + new Random().nextInt(1000));
                    } catch (InterruptedException ignored) {
                    }
    
                    System.out.println("当前线程: " + Thread.currentThread().getName() + " 发布一个许可");
                    SEMAPHORE.release(1);
    
                });
            }
    
            System.out.println("-----> 这里是主线程");
    
            SEMAPHORE.acquire(5);
    
            System.out.println("-----> 主线程执行完毕");
    
            pool.shutdown();
        }
    
    }
    
    -----> 这里是主线程
    当前线程: Thread-pool-2 发布一个许可
    当前线程: Thread-pool-4 发布一个许可
    当前线程: Thread-pool-1 发布一个许可
    当前线程: Thread-pool-0 发布一个许可
    当前线程: Thread-pool-3 发布一个许可
    -----> 主线程执行完毕
    

    上面这个方法也是模拟了类似 CountDownLatch 的用法, 在子线程执行完毕之后,主线程继续执行。只不过 Semaphore 和 CountDownLatch 区别最大的是:

    Semaphore 是从指定数值开始增加,直到到达许可数量,然后被阻塞线程开始继续执行。

    CountDownLatch 是从指定数量的线程开始减少,直到为 0 时,被阻塞的线程开始继续执行。

    当然这只是最简单的用法,除此让主线程等待,同样也可以让其他线程等待,然后再开始执行。

    问题疑问

    1. Semaphore 和 AQS 有什么关系?
    2. Semaphore 和 CountDownLatch 有什么区别?

    源码分析

    基本结构

    Semaphore-cover-iGaTzJ

    通过类图可以看出在 Semaphore 里面有一个静态内部类 Sync 继承了 AQS,同时为了区分公平和非公平的情况,Sync 分别有两个子类:NonfairSync 、FairSync。

    下面根据案例分别从构造函数、acquire()、release() 入手,从而了解内部实现原理。

    初始化

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    

    初始化默认非公平锁, 同时需要传入指定许可数, 可以看到这块代码是调用的 AQS 的 setState(permits) 方法。代码如下:

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
    
        NonfairSync(int permits) {
            super(permits);
        }
    }
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 1192457210091910933L;
    
            Sync(int permits) {
                setState(permits);
            }
     }
    
    

    setState 方法其实就是对 AQS 的 state 进行赋值。

    补充

    1. 在 ReentrantLock 中 state 代表加锁状态,0 没有线程获得锁,大于等于 1 已经有线程获得锁,大于 1 说明该获得锁的线程多次重入。
    2. 在 ReentrantReadWriteLock 中 state 代表锁的状态。state 为 0 ,没有线程持有锁,state 的高 16 为代表读锁状态,低 16 为代表写锁状态。通过位运算可以获取读写锁的实际值。
    3. 而在这里 (CountDownLatch)则代表门闩或者说计数的值。

    如果对 state 有所遗忘,可以阅读前面的 AQS 、CAS 相关代码。 state 在这里代表的是信号量的许可数量。

    acquire()

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }
    
    

    acquire() 和 acquire(int permits) 调用的都是 sync.acquireSharedInterruptibly(permits) 方法,只不过一个支持传递参数,一个默认为 1。

    acquireSharedInterruptibly 方法,其实就是 Sync 继承自 AQS 的。

    这块可以阅读 AQS 的文章,这里简单介绍下:

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    1. 在失败后会使用 doAcquireSharedInterruptibly(arg); 不断获取资源;
    2. final Node node = addWaiter(Node.SHARED); 会创建节点以共享模式放到队列里;
    3. 在循环中不断判断前一个节点,如果是 head,则尝试获取共享资源;
    4. 在共享模式下获取到资源后会使用 setHeadAndPropagate(node, r); 设置头节点,同时唤醒后续节点。

    tryAcquireShared 是需要子类实现,也就是在 Semaphore.Sync 的实现类中实现了,这里以 FairSync 做讲解:

    
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;
    
        FairSync(int permits) {
            super(permits);
        }
    
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                // 如果前面有节点,则直接返回 -1 表示失败
                if (hasQueuedPredecessors())
                    return -1;
                // 获取当前信号量
                int available = getState();
                // 获取当前剩余量
                int remaining = available - acquires;
                // 如果小于 0 或者 CAS 设置信号量成功 则直接返回
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
    

    而这段代码的含义:

    1. 如果前面有节点,则直接阻塞;
    2. 如果当前剩余信号量小于 0 ,则返回负值,直接阻塞;
    3. 如果当前剩余量大于等于 0 ,会 CAS 更新信号量,并返回非负数。

    这块数值的含义,在 AQS 中定义了,含义如下:

    1. 小于 0: 表示失败;
    2. 等于 0: 表示共享模式获取资源成功,但后续的节点不能以共享模式获取成功;
    3. 大于 0: 表示共享模式获取资源成功,后续节点在共享模式获取也可能会成功,在这种情况下,后续等待线程必须检查可用性。

    release()

    public void release() {
        sync.releaseShared(1);
    }
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }
    

    发布许可证的给定数量,该数量增加可用的许可数量。 看其内部调用的是 Sync 的 releaseShared, 其实就是 AQS 的对应方法:

    
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    

    如果实现tryReleaseShared返回true,以共享模式释放资源。 其中的 tryReleaseShared 部分由 Semaphore.Sync 中实现,逻辑如下:

    
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            // 获取当前 state
            int current = getState();
            // 对 state 进行增加
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // 使用 CAS 赋值
            if (compareAndSetState(current, next))
                return true;
        }
    }
    

    通过上面代码可以看出,在 Semaphore 的 release 方法中主要就是对 state 进行增加,增加成功后会调用 AQS 的 doReleaseShared 方法唤醒头节点。

    总结

    Q&A

    Q: 既然 Semaphore 也是基于 AQS, 那在 Semaphore 中 state 的含义代表什么?
    A: 在 Semaphore 中 state 代表许可数量,acquire 方法当许可小于指定数量会阻塞线程,release 方法增加许可当许可增加成功则唤醒阻塞节点。

    Q: Semaphore 基于 AQS 具体是怎么实现的呢?
    A:

    1. 初始设置 state 的初始值,即初始许可数量。
    2. acquire 方法设置目标数量,当目标数量大于当前数量时,会阻塞线程并将其放到阻塞队列中。此处基于 AQS 实现。
    3. release 对 state 进行增加,成功后会调用 AQS 的 doReleaseShared 唤醒头结点。同样是基于 AQS 实现。

    Q: Semaphore 和 CountDownLatch 有什么区别?
    A: Semaphore 的计数器是递加的,而 CountDownLatch 是递减的。相同点就是计数器都不可以重置。

    结束语

    在阅读 Semaphore 源码过程中,发现其主要功能都是基于 AQS 实现的,可以回顾阅读 AQS 的相关笔记。同样 Semaphore 也支持公平和非公平模式,这块就需要小伙伴自己去阅读啦。

  • 相关阅读:
    玩转树莓派《二》——用python实现动画与多媒体
    pygame(一)
    玩转树莓派(一)
    pythonchallenge(七)
    springmvc定时器
    maven打包成第三方jar包且把pom依赖包打入进来
    mybatis之动态SQL
    黑马12期day01之html&css
    千万级数据表删除特定字断
    自动跳转
  • 原文地址:https://www.cnblogs.com/liuzhihang/p/Semaphore.html
Copyright © 2011-2022 走看看