zoukankan      html  css  js  c++  java
  • semaphore

    semaphore 信号标,旗语。

    Semaphore 一般译作 信号量,它也是一种线程同步工具,主要用于多个线程对共享资源进行并行操作的一种工具类。它代表了一种许可的概念,是否允许多线程对同一资源进行操作的许可,使用 Semaphore 可以控制并发访问资源的线程个数。





         * Synchronization implementation for semaphore.  Uses AQS state
         * to represent permits. Subclassed into fair and nonfair
         * versions.
        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 1192457210091910933L;
            Sync(int permits) {
             * 设置AQS中的state值,AQS中state的值就是同步状态的值,而semaphore中的permits代表了许可的数量
             **/ setState(permits); }
          * 调用了父类的getstate方法获取一下线程同步状态值,线程同步值就是state
    final int getPermits() { return getState(); }
         * 这个方法是为了semaphore中的非公平锁提供方法
    final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 ||
              * 这个方法调用了AQS中的cas方法设置state值,就是semaphore中的信号量
                compareAndSetState(available, remaining))
                return remaining;
            protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + releases;
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
            final void reducePermits(int reductions) {
                for (;;) {
                    int current = getState();
                    int next = current - reductions;
                    if (next > current) // underflow
                        throw new Error("Permit count underflow");
                    if (compareAndSetState(current, next))
            final int drainPermits() {
                for (;;) {
                    int current = getState();
                    if (current == 0 || compareAndSetState(current, 0))
                        return current;
         * NonFair version
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -2694183684443567898L;
            NonfairSync(int permits) {
            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
         * Atomically sets synchronization state to the given updated
         * value if the current state value equals the expected value.
         * This operation has memory semantics of a {@code volatile} read
         * and write.
         * @param expect the expected value
         * @param update the new value
         * @return {@code true} if successful. False return indicates that the actual
         *         value was not equal to the expected value.
        protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);


         * Fair version
        static final class FairSync extends Sync {
            private static final long serialVersionUID = 2014338818796000944L;
            FairSync(int permits) {
            protected int tryAcquireShared(int acquires) {
                for (;;) {
    // 此处调用了AQS类中的方法,此方法是用来查看是否有线程正处于阻塞队列中阻塞,这个方法是公平锁和非公平锁差异的关键,前者判断的是处于阻塞队列中的线程数量,后者是直接判断是否满足条件
    if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }

    对比发现公平锁和非公平锁的区别就是非公锁是FairSync是先判断是否存在待阻塞线程,这就是后到后出, 而NonfairSync是直接判断state状态值是否满足条件,如果满足条件直接返回,这就是先到先得。 


         * Acquires a permit from this semaphore, blocking until one is
         * available, or the thread is {@linkplain Thread#interrupt interrupted}.
         * <p>Acquires a permit, if one is available and returns immediately,
         * reducing the number of available permits by one.
         * <p>If no permit is available then the current thread becomes
         * disabled for thread scheduling purposes and lies dormant until
         * one of two things happens:
         * <ul>
         * <li>Some other thread invokes the {@link #release} method for this
         * semaphore and the current thread is next to be assigned a permit; or
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * the current thread.
         * </ul>
         * <p>If the current thread:
         * <ul>
         * <li>has its interrupted status set on entry to this method; or
         * <li>is {@linkplain Thread#interrupt interrupted} while waiting
         * for a permit,
         * </ul>
         * then {@link InterruptedException} is thrown and the current thread's
         * interrupted status is cleared.
         * @throws InterruptedException if the current thread is interrupted
        public void acquire() throws InterruptedException {
         * Acquires in shared mode, aborting if interrupted.  Implemented
         * by first checking interrupt status, then invoking at least once
         * {@link #tryAcquireShared}, returning on success.  Otherwise the
         * thread is queued, possibly repeatedly blocking and unblocking,
         * invoking {@link #tryAcquireShared} until success or the thread
         * is interrupted.
         * @param arg the acquire argument.
         * This value is conveyed to {@link #tryAcquireShared} but is
         * otherwise uninterpreted and can represent anything
         * you like.
         * @throws InterruptedException if the current thread is interrupted
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)


    protected int tryAcquireShared(int arg) {
            throw new UnsupportedOperationException();


         * Acquires in exclusive interruptible mode.
         * @param arg the acquire argument
        private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) {
    final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {//这里判断当前线程前面只有一个前驱节点,就是说当前线程是老二,再此请求获取设置state,如果成功,则把自己设置为头,然后把前一个线程节点置空。 setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) &&//此方法是检查更新当前线程的节点状态 parkAndCheckInterrupt())//如果检查到被打断则抛出被打断的异常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
         * Creates and enqueues node for current thread and given mode.
         * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
         * @return the new node
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
            return node;


         * Checks and updates status for a node that failed to acquire.
         * Returns true if thread should block. This is the main signal
         * control in all acquire loops.  Requires that pred == node.prev.
         * @param pred node's predecessor holding status
         * @param node the node
         * @return {@code true} if thread should block
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)//当前节点需要阻塞
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                return true;
            if (ws > 0) {
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                do {
                    node.prev = pred = pred.prev;//跳过已经取消的节点
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            return false;
            /** waitStatus value to indicate thread has cancelled */
            static final int CANCELLED =  1;
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
            static final int PROPAGATE = -3;


         * Acquires a permit from this semaphore, blocking until one is
         * available.
         * <p>Acquires a permit, if one is available and returns immediately,
         * reducing the number of available permits by one.
         * <p>If no permit is available then the current thread becomes
         * disabled for thread scheduling purposes and lies dormant until
         * some other thread invokes the {@link #release} method for this
         * semaphore and the current thread is next to be assigned a permit.
         * <p>If the current thread is {@linkplain Thread#interrupt interrupted}
         * while waiting for a permit then it will continue to wait, but the
         * time at which the thread is assigned a permit may change compared to
         * the time it would have received the permit had no interruption
         * occurred.  When the thread does return from this method its interrupt
         * status will be set.
        public void acquireUninterruptibly() {


         * Releases a permit, returning it to the semaphore.
         * <p>Releases a permit, increasing the number of available permits by
         * one.  If any threads are trying to acquire a permit, then one is
         * selected and given the permit that was just released.  That thread
         * is (re)enabled for thread scheduling purposes.
         * <p>There is no requirement that a thread that releases a permit must
         * have acquired that permit by calling {@link #acquire}.
         * Correct usage of a semaphore is established by programming convention
         * in the application.
        public void release() {

    其他 Semaphore 方法

    除了上面基本的 acquire 和 release 相关方法外,我们也要了解一下 Semaphore 的其他方法。Semaphore 的其他方法比较少,只有下面这几个

    drainPermits : 获取并退还所有立即可用的许可,其实相当于使用 CAS 方法把内存值置为 0

    reducePermits:和 nonfairTryAcquireShared 方法类似,只不过 nonfairTryAcquireShared 是使用 CAS 使内存值 + 1,而 reducePermits 是使内存值 - 1 。

    isFair:对 Semaphore 许可的争夺是采用公平还是非公平的方式,对应到内部的实现就是 FairSync 和 NonfairSync。

    hasQueuedThreads:当前是否有线程由于要获取 Semaphore 许可而进入阻塞。



    参考文章:呵呵,Semaphore,就这? - 知乎 (zhihu.com)

  • 相关阅读:
    Codeforces548D:Mike and Feet(单调栈)
    MVC action返回partialView前台html 拼接
    vb.net 字符串的操作 应用
    BitNami Redmine Stack
  • 原文地址:https://www.cnblogs.com/YsirSun/p/15585527.html
Copyright © 2011-2022 走看看