zoukankan      html  css  js  c++  java
  • Java多线程之JUC包:Semaphore源码学习笔记

    若有不正之处请多多谅解,并欢迎批评指正。

    请尊重作者劳动成果,转载请标明原文链接:

    http://www.cnblogs.com/go2sea/p/5625536.html

    Semaphore是JUC包提供的一个共享锁,一般称之为信号量。

    Semaphore通过自定义的同步器维护了一个或多个共享资源,线程通过调用acquire获取共享资源,通过调用release释放。

    源代码:

    /*
     * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     */
    
    /*
     *
     *
     *
     *
     *
     * Written by Doug Lea with assistance from members of JCP JSR-166
     * Expert Group and released to the public domain, as explained at
     * http://creativecommons.org/publicdomain/zero/1.0/
     */
    
    package java.util.concurrent;
    import java.util.*;
    import java.util.concurrent.locks.*;
    import java.util.concurrent.atomic.*;
    
    /**
     * A counting semaphore.  Conceptually, a semaphore maintains a set of
     * permits.  Each {@link #acquire} blocks if necessary until a permit is
     * available, and then takes it.  Each {@link #release} adds a permit,
     * potentially releasing a blocking acquirer.
     * However, no actual permit objects are used; the {@code Semaphore} just
     * keeps a count of the number available and acts accordingly.
     *
     * <p>Semaphores are often used to restrict the number of threads than can
     * access some (physical or logical) resource. For example, here is
     * a class that uses a semaphore to control access to a pool of items:
     * <pre>
     * class Pool {
     *   private static final int MAX_AVAILABLE = 100;
     *   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
     *
     *   public Object getItem() throws InterruptedException {
     *     available.acquire();
     *     return getNextAvailableItem();
     *   }
     *
     *   public void putItem(Object x) {
     *     if (markAsUnused(x))
     *       available.release();
     *   }
     *
     *   // Not a particularly efficient data structure; just for demo
     *
     *   protected Object[] items = ... whatever kinds of items being managed
     *   protected boolean[] used = new boolean[MAX_AVAILABLE];
     *
     *   protected synchronized Object getNextAvailableItem() {
     *     for (int i = 0; i < MAX_AVAILABLE; ++i) {
     *       if (!used[i]) {
     *          used[i] = true;
     *          return items[i];
     *       }
     *     }
     *     return null; // not reached
     *   }
     *
     *   protected synchronized boolean markAsUnused(Object item) {
     *     for (int i = 0; i < MAX_AVAILABLE; ++i) {
     *       if (item == items[i]) {
     *          if (used[i]) {
     *            used[i] = false;
     *            return true;
     *          } else
     *            return false;
     *       }
     *     }
     *     return false;
     *   }
     *
     * }
     * </pre>
     *
     * <p>Before obtaining an item each thread must acquire a permit from
     * the semaphore, guaranteeing that an item is available for use. When
     * the thread has finished with the item it is returned back to the
     * pool and a permit is returned to the semaphore, allowing another
     * thread to acquire that item.  Note that no synchronization lock is
     * held when {@link #acquire} is called as that would prevent an item
     * from being returned to the pool.  The semaphore encapsulates the
     * synchronization needed to restrict access to the pool, separately
     * from any synchronization needed to maintain the consistency of the
     * pool itself.
     *
     * <p>A semaphore initialized to one, and which is used such that it
     * only has at most one permit available, can serve as a mutual
     * exclusion lock.  This is more commonly known as a <em>binary
     * semaphore</em>, because it only has two states: one permit
     * available, or zero permits available.  When used in this way, the
     * binary semaphore has the property (unlike many {@link Lock}
     * implementations), that the &quot;lock&quot; can be released by a
     * thread other than the owner (as semaphores have no notion of
     * ownership).  This can be useful in some specialized contexts, such
     * as deadlock recovery.
     *
     * <p> The constructor for this class optionally accepts a
     * <em>fairness</em> parameter. When set false, this class makes no
     * guarantees about the order in which threads acquire permits. In
     * particular, <em>barging</em> is permitted, that is, a thread
     * invoking {@link #acquire} can be allocated a permit ahead of a
     * thread that has been waiting - logically the new thread places itself at
     * the head of the queue of waiting threads. When fairness is set true, the
     * semaphore guarantees that threads invoking any of the {@link
     * #acquire() acquire} methods are selected to obtain permits in the order in
     * which their invocation of those methods was processed
     * (first-in-first-out; FIFO). Note that FIFO ordering necessarily
     * applies to specific internal points of execution within these
     * methods.  So, it is possible for one thread to invoke
     * {@code acquire} before another, but reach the ordering point after
     * the other, and similarly upon return from the method.
     * Also note that the untimed {@link #tryAcquire() tryAcquire} methods do not
     * honor the fairness setting, but will take any permits that are
     * available.
     *
     * <p>Generally, semaphores used to control resource access should be
     * initialized as fair, to ensure that no thread is starved out from
     * accessing a resource. When using semaphores for other kinds of
     * synchronization control, the throughput advantages of non-fair
     * ordering often outweigh fairness considerations.
     *
     * <p>This class also provides convenience methods to {@link
     * #acquire(int) acquire} and {@link #release(int) release} multiple
     * permits at a time.  Beware of the increased risk of indefinite
     * postponement when these methods are used without fairness set true.
     *
     * <p>Memory consistency effects: Actions in a thread prior to calling
     * a "release" method such as {@code release()}
     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
     * actions following a successful "acquire" method such as {@code acquire()}
     * in another thread.
     *
     * @since 1.5
     * @author Doug Lea
     *
     */
    
    public class Semaphore implements java.io.Serializable {
        private static final long serialVersionUID = -3222578661600680210L;
        /** All mechanics via AbstractQueuedSynchronizer subclass */
        private final Sync sync;
    
        /**
         * Synchronization implementation for semaphore.  Uses AQS state
         * to represent permits. Subclassed into fair and nonfair
         * versions.
         */
        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 1192457210091910933L;
    
            Sync(int permits) {
                setState(permits);
            }
    
            final int getPermits() {
                return getState();
            }
    
            final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    
            protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + releases;
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
                }
            }
    
            final void reducePermits(int reductions) {
                for (;;) {
                    int current = getState();
                    int next = current - reductions;
                    if (next > current) // underflow
                        throw new Error("Permit count underflow");
                    if (compareAndSetState(current, next))
                        return;
                }
            }
    
            final int drainPermits() {
                for (;;) {
                    int current = getState();
                    if (current == 0 || compareAndSetState(current, 0))
                        return current;
                }
            }
        }
    
        /**
         * NonFair version
         */
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -2694183684443567898L;
    
            NonfairSync(int permits) {
                super(permits);
            }
    
            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
        }
    
        /**
         * Fair version
         */
        static final class FairSync extends Sync {
            private static final long serialVersionUID = 2014338818796000944L;
    
            FairSync(int permits) {
                super(permits);
            }
    
            protected int tryAcquireShared(int acquires) {
                for (;;) {
                    if (hasQueuedPredecessors())
                        return -1;
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
        }
    
        /**
         * Creates a {@code Semaphore} with the given number of
         * permits and nonfair fairness setting.
         *
         * @param permits the initial number of permits available.
         *        This value may be negative, in which case releases
         *        must occur before any acquires will be granted.
         */
        public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
    
        /**
         * Creates a {@code Semaphore} with the given number of
         * permits and the given fairness setting.
         *
         * @param permits the initial number of permits available.
         *        This value may be negative, in which case releases
         *        must occur before any acquires will be granted.
         * @param fair {@code true} if this semaphore will guarantee
         *        first-in first-out granting of permits under contention,
         *        else {@code false}
         */
        public Semaphore(int permits, boolean fair) {
            sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }
    
        /**
         * Acquires a permit from this semaphore, blocking until one is
         * available, or the thread is {@linkplain Thread#interrupt interrupted}.
         *
         * <p>Acquires a permit, if one is available and returns immediately,
         * reducing the number of available permits by one.
         *
         * <p>If no permit is available then the current thread becomes
         * disabled for thread scheduling purposes and lies dormant until
         * one of two things happens:
         * <ul>
         * <li>Some other thread invokes the {@link #release} method for this
         * semaphore and the current thread is next to be assigned a permit; or
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * the current thread.
         * </ul>
         *
         * <p>If the current thread:
         * <ul>
         * <li>has its interrupted status set on entry to this method; or
         * <li>is {@linkplain Thread#interrupt interrupted} while waiting
         * for a permit,
         * </ul>
         * then {@link InterruptedException} is thrown and the current thread's
         * interrupted status is cleared.
         *
         * @throws InterruptedException if the current thread is interrupted
         */
        public void acquire() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        /**
         * Acquires a permit from this semaphore, blocking until one is
         * available.
         *
         * <p>Acquires a permit, if one is available and returns immediately,
         * reducing the number of available permits by one.
         *
         * <p>If no permit is available then the current thread becomes
         * disabled for thread scheduling purposes and lies dormant until
         * some other thread invokes the {@link #release} method for this
         * semaphore and the current thread is next to be assigned a permit.
         *
         * <p>If the current thread is {@linkplain Thread#interrupt interrupted}
         * while waiting for a permit then it will continue to wait, but the
         * time at which the thread is assigned a permit may change compared to
         * the time it would have received the permit had no interruption
         * occurred.  When the thread does return from this method its interrupt
         * status will be set.
         */
        public void acquireUninterruptibly() {
            sync.acquireShared(1);
        }
    
        /**
         * Acquires a permit from this semaphore, only if one is available at the
         * time of invocation.
         *
         * <p>Acquires a permit, if one is available and returns immediately,
         * with the value {@code true},
         * reducing the number of available permits by one.
         *
         * <p>If no permit is available then this method will return
         * immediately with the value {@code false}.
         *
         * <p>Even when this semaphore has been set to use a
         * fair ordering policy, a call to {@code tryAcquire()} <em>will</em>
         * immediately acquire a permit if one is available, whether or not
         * other threads are currently waiting.
         * This &quot;barging&quot; behavior can be useful in certain
         * circumstances, even though it breaks fairness. If you want to honor
         * the fairness setting, then use
         * {@link #tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit.SECONDS) }
         * which is almost equivalent (it also detects interruption).
         *
         * @return {@code true} if a permit was acquired and {@code false}
         *         otherwise
         */
        public boolean tryAcquire() {
            return sync.nonfairTryAcquireShared(1) >= 0;
        }
    
        /**
         * Acquires a permit from this semaphore, if one becomes available
         * within the given waiting time and the current thread has not
         * been {@linkplain Thread#interrupt interrupted}.
         *
         * <p>Acquires a permit, if one is available and returns immediately,
         * with the value {@code true},
         * reducing the number of available permits by one.
         *
         * <p>If no permit is available then the current thread becomes
         * disabled for thread scheduling purposes and lies dormant until
         * one of three things happens:
         * <ul>
         * <li>Some other thread invokes the {@link #release} method for this
         * semaphore and the current thread is next to be assigned a permit; or
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * the current thread; or
         * <li>The specified waiting time elapses.
         * </ul>
         *
         * <p>If a permit is acquired then the value {@code true} is returned.
         *
         * <p>If the current thread:
         * <ul>
         * <li>has its interrupted status set on entry to this method; or
         * <li>is {@linkplain Thread#interrupt interrupted} while waiting
         * to acquire a permit,
         * </ul>
         * then {@link InterruptedException} is thrown and the current thread's
         * interrupted status is cleared.
         *
         * <p>If the specified waiting time elapses then the value {@code false}
         * is returned.  If the time is less than or equal to zero, the method
         * will not wait at all.
         *
         * @param timeout the maximum time to wait for a permit
         * @param unit the time unit of the {@code timeout} argument
         * @return {@code true} if a permit was acquired and {@code false}
         *         if the waiting time elapsed before a permit was acquired
         * @throws InterruptedException if the current thread is interrupted
         */
        public boolean tryAcquire(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
        /**
         * Releases a permit, returning it to the semaphore.
         *
         * <p>Releases a permit, increasing the number of available permits by
         * one.  If any threads are trying to acquire a permit, then one is
         * selected and given the permit that was just released.  That thread
         * is (re)enabled for thread scheduling purposes.
         *
         * <p>There is no requirement that a thread that releases a permit must
         * have acquired that permit by calling {@link #acquire}.
         * Correct usage of a semaphore is established by programming convention
         * in the application.
         */
        public void release() {
            sync.releaseShared(1);
        }
    
        /**
         * Acquires the given number of permits from this semaphore,
         * blocking until all are available,
         * or the thread is {@linkplain Thread#interrupt interrupted}.
         *
         * <p>Acquires the given number of permits, if they are available,
         * and returns immediately, reducing the number of available permits
         * by the given amount.
         *
         * <p>If insufficient permits are available then the current thread becomes
         * disabled for thread scheduling purposes and lies dormant until
         * one of two things happens:
         * <ul>
         * <li>Some other thread invokes one of the {@link #release() release}
         * methods for this semaphore, the current thread is next to be assigned
         * permits and the number of available permits satisfies this request; or
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * the current thread.
         * </ul>
         *
         * <p>If the current thread:
         * <ul>
         * <li>has its interrupted status set on entry to this method; or
         * <li>is {@linkplain Thread#interrupt interrupted} while waiting
         * for a permit,
         * </ul>
         * then {@link InterruptedException} is thrown and the current thread's
         * interrupted status is cleared.
         * Any permits that were to be assigned to this thread are instead
         * assigned to other threads trying to acquire permits, as if
         * permits had been made available by a call to {@link #release()}.
         *
         * @param permits the number of permits to acquire
         * @throws InterruptedException if the current thread is interrupted
         * @throws IllegalArgumentException if {@code permits} is negative
         */
        public void acquire(int permits) throws InterruptedException {
            if (permits < 0) throw new IllegalArgumentException();
            sync.acquireSharedInterruptibly(permits);
        }
    
        /**
         * Acquires the given number of permits from this semaphore,
         * blocking until all are available.
         *
         * <p>Acquires the given number of permits, if they are available,
         * and returns immediately, reducing the number of available permits
         * by the given amount.
         *
         * <p>If insufficient permits are available then the current thread becomes
         * disabled for thread scheduling purposes and lies dormant until
         * some other thread invokes one of the {@link #release() release}
         * methods for this semaphore, the current thread is next to be assigned
         * permits and the number of available permits satisfies this request.
         *
         * <p>If the current thread is {@linkplain Thread#interrupt interrupted}
         * while waiting for permits then it will continue to wait and its
         * position in the queue is not affected.  When the thread does return
         * from this method its interrupt status will be set.
         *
         * @param permits the number of permits to acquire
         * @throws IllegalArgumentException if {@code permits} is negative
         *
         */
        public void acquireUninterruptibly(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            sync.acquireShared(permits);
        }
    
        /**
         * Acquires the given number of permits from this semaphore, only
         * if all are available at the time of invocation.
         *
         * <p>Acquires the given number of permits, if they are available, and
         * returns immediately, with the value {@code true},
         * reducing the number of available permits by the given amount.
         *
         * <p>If insufficient permits are available then this method will return
         * immediately with the value {@code false} and the number of available
         * permits is unchanged.
         *
         * <p>Even when this semaphore has been set to use a fair ordering
         * policy, a call to {@code tryAcquire} <em>will</em>
         * immediately acquire a permit if one is available, whether or
         * not other threads are currently waiting.  This
         * &quot;barging&quot; behavior can be useful in certain
         * circumstances, even though it breaks fairness. If you want to
         * honor the fairness setting, then use {@link #tryAcquire(int,
         * long, TimeUnit) tryAcquire(permits, 0, TimeUnit.SECONDS) }
         * which is almost equivalent (it also detects interruption).
         *
         * @param permits the number of permits to acquire
         * @return {@code true} if the permits were acquired and
         *         {@code false} otherwise
         * @throws IllegalArgumentException if {@code permits} is negative
         */
        public boolean tryAcquire(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            return sync.nonfairTryAcquireShared(permits) >= 0;
        }
    
        /**
         * Acquires the given number of permits from this semaphore, if all
         * become available within the given waiting time and the current
         * thread has not been {@linkplain Thread#interrupt interrupted}.
         *
         * <p>Acquires the given number of permits, if they are available and
         * returns immediately, with the value {@code true},
         * reducing the number of available permits by the given amount.
         *
         * <p>If insufficient permits are available then
         * the current thread becomes disabled for thread scheduling
         * purposes and lies dormant until one of three things happens:
         * <ul>
         * <li>Some other thread invokes one of the {@link #release() release}
         * methods for this semaphore, the current thread is next to be assigned
         * permits and the number of available permits satisfies this request; or
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * the current thread; or
         * <li>The specified waiting time elapses.
         * </ul>
         *
         * <p>If the permits are acquired then the value {@code true} is returned.
         *
         * <p>If the current thread:
         * <ul>
         * <li>has its interrupted status set on entry to this method; or
         * <li>is {@linkplain Thread#interrupt interrupted} while waiting
         * to acquire the permits,
         * </ul>
         * then {@link InterruptedException} is thrown and the current thread's
         * interrupted status is cleared.
         * Any permits that were to be assigned to this thread, are instead
         * assigned to other threads trying to acquire permits, as if
         * the permits had been made available by a call to {@link #release()}.
         *
         * <p>If the specified waiting time elapses then the value {@code false}
         * is returned.  If the time is less than or equal to zero, the method
         * will not wait at all.  Any permits that were to be assigned to this
         * thread, are instead assigned to other threads trying to acquire
         * permits, as if the permits had been made available by a call to
         * {@link #release()}.
         *
         * @param permits the number of permits to acquire
         * @param timeout the maximum time to wait for the permits
         * @param unit the time unit of the {@code timeout} argument
         * @return {@code true} if all permits were acquired and {@code false}
         *         if the waiting time elapsed before all permits were acquired
         * @throws InterruptedException if the current thread is interrupted
         * @throws IllegalArgumentException if {@code permits} is negative
         */
        public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
            throws InterruptedException {
            if (permits < 0) throw new IllegalArgumentException();
            return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
        }
    
        /**
         * Releases the given number of permits, returning them to the semaphore.
         *
         * <p>Releases the given number of permits, increasing the number of
         * available permits by that amount.
         * If any threads are trying to acquire permits, then one
         * is selected and given the permits that were just released.
         * If the number of available permits satisfies that thread's request
         * then that thread is (re)enabled for thread scheduling purposes;
         * otherwise the thread will wait until sufficient permits are available.
         * If there are still permits available
         * after this thread's request has been satisfied, then those permits
         * are assigned in turn to other threads trying to acquire permits.
         *
         * <p>There is no requirement that a thread that releases a permit must
         * have acquired that permit by calling {@link Semaphore#acquire acquire}.
         * Correct usage of a semaphore is established by programming convention
         * in the application.
         *
         * @param permits the number of permits to release
         * @throws IllegalArgumentException if {@code permits} is negative
         */
        public void release(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            sync.releaseShared(permits);
        }
    
        /**
         * Returns the current number of permits available in this semaphore.
         *
         * <p>This method is typically used for debugging and testing purposes.
         *
         * @return the number of permits available in this semaphore
         */
        public int availablePermits() {
            return sync.getPermits();
        }
    
        /**
         * Acquires and returns all permits that are immediately available.
         *
         * @return the number of permits acquired
         */
        public int drainPermits() {
            return sync.drainPermits();
        }
    
        /**
         * Shrinks the number of available permits by the indicated
         * reduction. This method can be useful in subclasses that use
         * semaphores to track resources that become unavailable. This
         * method differs from {@code acquire} in that it does not block
         * waiting for permits to become available.
         *
         * @param reduction the number of permits to remove
         * @throws IllegalArgumentException if {@code reduction} is negative
         */
        protected void reducePermits(int reduction) {
            if (reduction < 0) throw new IllegalArgumentException();
            sync.reducePermits(reduction);
        }
    
        /**
         * Returns {@code true} if this semaphore has fairness set true.
         *
         * @return {@code true} if this semaphore has fairness set true
         */
        public boolean isFair() {
            return sync instanceof FairSync;
        }
    
        /**
         * Queries whether any threads are waiting to acquire. Note that
         * because cancellations may occur at any time, a {@code true}
         * return does not guarantee that any other thread will ever
         * acquire.  This method is designed primarily for use in
         * monitoring of the system state.
         *
         * @return {@code true} if there may be other threads waiting to
         *         acquire the lock
         */
        public final boolean hasQueuedThreads() {
            return sync.hasQueuedThreads();
        }
    
        /**
         * Returns an estimate of the number of threads waiting to acquire.
         * The value is only an estimate because the number of threads may
         * change dynamically while this method traverses internal data
         * structures.  This method is designed for use in monitoring of the
         * system state, not for synchronization control.
         *
         * @return the estimated number of threads waiting for this lock
         */
        public final int getQueueLength() {
            return sync.getQueueLength();
        }
    
        /**
         * Returns a collection containing threads that may be waiting to acquire.
         * Because the actual set of threads may change dynamically while
         * constructing this result, the returned collection is only a best-effort
         * estimate.  The elements of the returned collection are in no particular
         * order.  This method is designed to facilitate construction of
         * subclasses that provide more extensive monitoring facilities.
         *
         * @return the collection of threads
         */
        protected Collection<Thread> getQueuedThreads() {
            return sync.getQueuedThreads();
        }
    
        /**
         * Returns a string identifying this semaphore, as well as its state.
         * The state, in brackets, includes the String {@code "Permits ="}
         * followed by the number of permits.
         *
         * @return a string identifying this semaphore, as well as its state
         */
        public String toString() {
            return super.toString() + "[Permits = " + sync.getPermits() + "]";
        }
    }
    View Code

    下面我们来详细分下下Semaphore的工作原理。

    一、构造函数

        public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
    
        public Semaphore(int permits, boolean fair) {
            sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }

    初始化Semaphore时需要指定共享资源的个数。Semaphore提供了两种模式:公平模式&非公平模式。如果不指定工作模式的话,默认工作在非公平模式下。后面我们将看到,两种模式的区别在于获取共享资源时的排序策略。Semaphore有三个内部类:Sync&NonfairSync&FairSync。后两个继承自Sync,Sync继承自AQS。除了序列化版本号之外,Semaphore只有一个成员变量sync,公平模式下sync初始化为FairSync,非公平模式下sync初始化为NonfairSync。

    二、acquire 响应中断获取资源

    Semaphore提供了两种获取资源的方式:响应中断&不响应中断。我们先来看一下响应中断的获取。

        public void acquire() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }

    acquire方法由同步器sync调用上层AQS提供的acquireSharedInterruptibly方法获取:

        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }

    acquireSharedInterruptibly方法先检测中断。然后调用tryAcquireShared方法试图获取共享资源。这时公平模式和非公平模式的代码执行路径发生分叉,FairSync和NonfairSync各自重写了tryAcquireShared方法。

    我们先来看下非公平模式下的tryAcquireShared方法:

            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }

    它直接代用了父类Sync提供的nonfairTryAcquireShared方法:

            final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }

    注意,这里是一个CAS自旋。因为Semaphore是一个共享锁,可能有多个线程同时申请共享资源,因此CAS操作可能失败。直到成功获取返回剩余资源数目,或者发现没有剩余资源返回负值代表申请失败。有一个问题,为什么我们不在CAS操作失败后就直接返回失败呢?因为这样做虽然不会导致错误,但会降低效率:在还有剩余资源的情况下,一个线程因为竞争导致CAS失败后被放入等待序列尾,一定在队列头部有一个线程被唤醒去试图获取资源,这比直接自旋继续获取多了操作等待队列的开销。

    这里“非公平”的语义体现在:如果一个线程通过nonfairTryAcquireShared成功获取了共享资源,对于此时正在等待队列中的线程来说,可能是不公平的:队列中线程先到,却没能先获取资源。

    如果tryAcquireShared没能成功获取,acquireSharedInterruptibly方法调用doAcquireSharedInterruptibly方法将当前线程放入等待队列并开始自旋检测获取资源:

        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    我们注意到,doAcquireSharedInterruptibly中,当一个线程从parkAndCheckInterrupt方法中被中断唤醒之后,直接抛出了中断异常。还记得我们分析AQS时的doAcquireShared方法吗,它在这里的处理方式是用一个局部变量interrupted记录下这个异常但不立即处理,而是等到成功获取资源之后返回这个中断标志,并在上层调用selfInterrupt方法补上中断。这正是两个方法的关键区别:是否及时响应中断。

    我们再来看公平模式下的tryAcquireShared方法:

            protected int tryAcquireShared(int acquires) {
                for (;;) {
                    if (hasQueuedPredecessors())
                        return -1;
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }

    相比较非公平模式的nonfairTryAcquireShared方法,公平模式下的tryAcquireShared方法在试图获取之前做了一个判断,如果发现等对队列中有线程在等待获取资源,就直接返回-1表示获取失败。当前线程会被上层的acquireSharedInterruptibly方法调用doAcquireShared方法放入等待队列中。这正是“公平”模式的语义:如果有线程先于我进入等待队列且正在等待,就直接进入等待队列,效果便是各个线程按照申请的顺序获得共享资源,具有公平性。

    三、acquireUnInterruptibly 不响应中断获取资源

        public void acquireUninterruptibly() {
            sync.acquireShared(1);
        }

    acquireUnInterruptibly方法调用AQS提供的acquireShared方法:

        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }

    acquireShared方法首先试图获取资源,这与acquireSharedInterruptibly方法相比,没有先检测中断的这一步。紧接着调用doAcquireShared方法,由于这个方法我在另一篇博文AQS源码学习笔记中已经详细分析过,这里我们只关注它与doAcquireSharedInterruptibly方法的区别:

        private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    正如刚刚说过的,区别只在线程从parkAndCheckInterrupt方法中因中断而返回时的处理:在这里它没有抛出异常,而是用一个局部变量interrupted记录下这个异常但不立即处理,而是等到成功获取资源之后返回这个中断标志,并在上层调用selfInterrupt方法补上中断。

    四、acquire(int) & acquireUninterruptibly(int) 指定申请的资源数目的获取

        public void acquire(int permits) throws InterruptedException {
            if (permits < 0) throw new IllegalArgumentException();
            sync.acquireSharedInterruptibly(permits);
        }
    
        public void acquireUninterruptibly(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            sync.acquireShared(permits);
        }

    可以看到,与不指定数目时的获取的区别仅在参数值,不再赘述。

    五、release 释放资源

    公平模式和非公平模式的释放资源操作是一样的:

        public void release() {
            sync.releaseShared(1);
        }
        
        public void release(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            sync.releaseShared(permits);
        }

    调用AQS提供的releaseShared方法:

        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }

    releaseShared方法首先调用我们重写的tryReleaseShared方法试图释放资源。然后调用doReleaseShared方法唤醒队列之后的等待线程。由于在我的另一篇博文AQS源码学习笔记中已经详细分析了doReleaseShared方法,因此不再赘述。我们主要关注tryReleaseShared方法:

            protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + releases;
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
                }
            }

    这个方法也是一个CAS自旋,原因是应为Semaphore是一个共享锁,可能有多个线程同时释放资源,因此CAS操作可能失败。最后方法总会成功释放并返回true(如果不出错的话)。

    六、tryAcquire & tryAcquire(timeout) 方法

        public boolean tryAcquire() {
            return sync.nonfairTryAcquireShared(1) >= 0;
        }
    
        public boolean tryAcquire(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
        public boolean tryAcquire(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            return sync.nonfairTryAcquireShared(permits) >= 0;
        }
    
        public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
            throws InterruptedException {
            if (permits < 0) throw new IllegalArgumentException();
            return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
        }

    没有指定等待时间的tryAcquire调用的是nonfairTryAcquireShared方法,我们已经分析过,不再赘述。我们重点关注指定等待时长的方法。限时等待是通过调用AQS提供的tryAcquireSharedNanos方法实现的:

        public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return tryAcquireShared(arg) >= 0 ||
                doAcquireSharedNanos(arg, nanosTimeout);
        }

    注意:限时等待默认都是及时响应中断的。方法开始先检测中断,然后调用tryAcquireShared方法试图获取资源,如果成功的话直接返回true,不成功则调用doAcquireSharedNanos方法:

        private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (nanosTimeout <= 0L)
                return false;
            final long deadline = System.nanoTime() + nanosTimeout;
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return true;
                        }
                    }
                    nanosTimeout = deadline - System.nanoTime();
                    if (nanosTimeout <= 0L)
                        return false;
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if (Thread.interrupted())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    方法在自旋之前先计算了一个结束等待的时间节点deadline,然后便开始自旋,每次自旋都要计算一下剩余等待时间nanosTimeout,如果nanosTimeout小于等于0,说明已经到达deadline,直接返回false表示超时。

    有一点值得注意,spinForTimeoutThreshold这个值规定了一个阈值,当剩余等待时间小于这个值的时候,线程将不再被park,而是一直在自旋试图获取资源。关于这个值的作用Doug Lea是这样注释的:

        /**
         * The number of nanoseconds for which it is faster to spin
         * rather than to use timed park. A rough estimate suffices
         * to improve responsiveness with very short timeouts.
         */

    我的理解是,park和unpark操作需要一定的开销,当nanosTimeout很小的时候,这个开销就相对很大了。这个阈值的设置可以让短时等待的线程一直保持自旋,可以提高短时等待的反应效率,而由于nanosTimeout很小,自旋又不会有过多的开销。

    除此之外,doAcquireSharedNanos方法与不限时等待的doAcquireShared方法还有两点重要区别:①由于有等待时限,所以线程从park方法返回时我们不能确定返回的原因是中断还是超时,因此需要调用interrupted方法检测一下中断标志;②doAcquireSharedNanos方法是及时响应中断的,而doAcquireShared方法延迟处理中断。

    七、drainPermits & reducePermits 修改剩余共享资源数量

    Semaphore提供了“耗尽”所有剩余共享资源的操作:

        public int drainPermits() {
            return sync.drainPermits();
        }

    drainPermits调用了自定义同步器Sync的同名方法:

            final int drainPermits() {
                for (;;) {
                    int current = getState();
                    if (current == 0 || compareAndSetState(current, 0))
                        return current;
                }
            }

    用CAS自旋将剩余资源清空。

    我们再来看看“缩减”剩余共享资源的操作:

        protected void reducePermits(int reduction) {
            if (reduction < 0) throw new IllegalArgumentException();
            sync.reducePermits(reduction);
        }

    首先,缩减必须是单向的,即只能减少不能增加,然后调用Sync的同名方法:

            final void reducePermits(int reductions) {
                for (;;) {
                    int current = getState();
                    int next = current - reductions;
                    if (next > current) // underflow
                        throw new Error("Permit count underflow");
                    if (compareAndSetState(current, next))
                        return;
                }
            }

    用CAS自旋在剩余共享资源上做缩减。

    上述两个对共享资源数量的修改操作有两点需要注意:①是不可逆的②是对剩余资源的操作而不是全部资源,当剩余资源数目不足或已经为0时,方法就返回,正咋被占用的资源不参与。

    八、其他

        public int availablePermits() {
            return sync.getPermits();
        }public boolean isFair() {
            return sync instanceof FairSync;
        }
    
        public final boolean hasQueuedThreads() {
            return sync.hasQueuedThreads();
        }
    
        public final int getQueueLength() {
            return sync.getQueueLength();
        }
    
        protected Collection<Thread> getQueuedThreads() {
            return sync.getQueuedThreads();
        }
    
        public String toString() {
            return super.toString() + "[Permits = " + sync.getPermits() + "]";
        }

    这些方法比较简单,不再赘述。

    总结:

    Semaphore是JUC包提供的一个典型的共享锁,它通过自定义两种不同的同步器(FairSync&NonfairSync)提供了公平&非公平两种工作模式,两种模式下分别提供了限时/不限时、响应中断/不响应中断的获取资源的方法(限时获取总是及时响应中断的),而所有的释放资源的release操作是统一的。


    作者:开方乘十

    出处:http://www.cnblogs.com/go2sea/

    本文版权归作者开方乘十和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接,否则保留追究法律责任的权利。

    如有不正之处,欢迎邮件(hailong.ma@qq.com)指正,谢谢。

  • 相关阅读:
    VMware Workstation 14 Pro永久激活密钥
    maven属性、profile、资源过滤、不同环境构建项目
    控制返回前台字段
    jsonp 返回以前必须要再转一次json
    doPost方法不支持 a 标签和地址栏直接输入地址访问
    设置响应头控制浏览器的缓存
    获取请求头、设置响应头、设置缓冲区
    重定向与错误发送
    文件下载
    web 应用响应乱码问题
  • 原文地址:https://www.cnblogs.com/go2sea/p/5625536.html
Copyright © 2011-2022 走看看