zoukankan      html  css  js  c++  java
  • 深入浅出Semaphore源码解析

    Semaphore通过permits的值来限制线程访问临界资源的总数,属于有限制次数的共享锁,不支持重入。

    前提条件

    在理解Semaphore时需要具备一些基本的知识:

    理解AQS的实现原理

    之前有写过一篇《深入浅出AQS源码解析》关于AQS的文章,对AQS原理不了解的同学可以先看一下

    Semaphore源码解析

    Semaphore中有3个内部类,分别是SyncNonfairSyncFairSyncSyncNonfairSyncFairSync的抽象类,我们会从解读Semaphore实现的功能开始入手逐渐去解析SyncNonfairSyncFairSync的源码

    public class Semaphore implements java.io.Serializable {
    
        private final Sync sync;
    
        /**
         * 初始化permits个资源
         */
        public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
    
        /**
         * 初始化permits个资源,根据fair来决定是使用公平锁还是非公平锁的方式
         */
        public Semaphore(int permits, boolean fair) {
            sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }
    
        /**
         * 中断方式获取一个资源
         */
        public void acquire() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        /**
         * 非中断方式获取一个资源
         */
        public void acquireUninterruptibly() {
            sync.acquireShared(1);
        }
    
        /**
         * 尝试获取一个资源
         */
        public boolean tryAcquire() {
            return sync.nonfairTryAcquireShared(1) >= 0;
        }
    
        /**
         * 尝试超时获取一个资源
         */
        public boolean tryAcquire(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
        /**
         * 释放一个资源
         */
        public void release() {
            sync.releaseShared(1);
        }
    
        /**
         * 中断方式获取permits个资源
         */
        public void acquire(int permits) throws InterruptedException {
            if (permits < 0) throw new IllegalArgumentException();
            sync.acquireSharedInterruptibly(permits);
        }
    
        /**
         * 非中断方式获取permits个资源
         */
        public void acquireUninterruptibly(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            sync.acquireShared(permits);
        }
    
        /**
         * 尝试获取permits个资源
         */
        public boolean tryAcquire(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            return sync.nonfairTryAcquireShared(permits) >= 0;
        }
    
        /**
         * 尝试超时获取permits个资源
         */
        public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
            throws InterruptedException {
            if (permits < 0) throw new IllegalArgumentException();
            return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
        }
    
        /**
         * 释放permits个资源
         */
        public void release(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            sync.releaseShared(permits);
        }
    
        /**
         * 获取当前可用资源数量
         */
        public int availablePermits() {
            return sync.getPermits();
        }
    
        /**
         * 用掉所有的资源,并返回用掉的资源数量
         */
        public int drainPermits() {
            return sync.drainPermits();
        }
    
        /**
         * 缩减reduction个资源
         */
        protected void reducePermits(int reduction) {
            if (reduction < 0) throw new IllegalArgumentException();
            sync.reducePermits(reduction);
        }
    }
    

    虽然Semaphore中的方法比较多,但是都比较简单,都是转调用Sync中的方法,通过解析Sync中的源码来帮助大家理解这些方法是如何实现的

    Sync类源码解析

    abstract static class Sync extends AbstractQueuedSynchronizer {
        // 获取所有可用的资源数量
        final int getPermits() {
            return getState();
        }
        // 非公平的方式尝试获取acquires个可用的资源
        final int nonfairTryAcquireShared(int acquires) {
            // 无限循环,尝试获取acquires个资源
            // 如果资源数量不够,返回剩余资源数量
            // 如果资源数量足够且获取成功,返回剩余的资源数量
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    
        // 尝试获取releases个资源
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                // 当releases不允许为负数
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                // CAS操作尝试修改state的值
                if (compareAndSetState(current, next))
                    return true;
            }
        }
    
        // 缩减releases个资源
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                // 当releases不允许为负数,也就时不能通过该方法增加资源
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                // CAS操作尝试修改state的值
                if (compareAndSetState(current, next))
                    return;
            }
        }
    
        // 清空所有的资源数量
        final int drainPermits() {
            for (;;) {
                int current = getState();
                // CAS操作尝试将资源数量设置为0
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }
    

    FairSync类源码解析

    FairSync中的源码很简单,直接上代码

    static final class FairSync extends Sync {
    
        FairSync(int permits) {
            super(permits);
        }
    
        protected int tryAcquireShared(int acquires) {
            /**
             * 具体思路如下:
             * 1、如果AQS的同步队列中有等待的线程,直接获取失败,会加入到AQS的同步队列中
             * 2、如果AQS的同步队列为空,尝试修改state的值来获取acquires个资源
             * 3、一直重复步骤1和2,直到有结果返回才退出无限循环
             */
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
    

    NonfairSync类源码解析

    NonfairSync中的源码就更简单,解析如下:

    static final class NonfairSync extends Sync {
    
        NonfairSync(int permits) {
            super(permits);
        }
        
        // 抢占式的获取acquires个资源
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
    

    总结

    • permits初始化为1时,Semaphore变成了一个互斥的排他锁
    • permits初始化为无穷大时,Semaphore变成了无锁模式
    • state的值为0的时候,无法获取资源,获取资源的线程会进入AQS的同步队列等待有资源释放时被唤醒
    • Semaphore初始化成非公平锁时,可能会出现有的线程饿死的情况,一般对于控制资源的使用而言,建议初始化为公平锁
    • 可以调用reducePermits动态的缩减资源的数量,但是不能增加资源的数量
  • 相关阅读:
    CPU使用率100%的一个原因
    Oracle 9.2.0.1升级到9.2.0.5
    Javacript同样的意思,更巧的写法
    前端面试题
    Linux shell
    vi/vim 光标移动命令
    CSS 中的分离、抽象和层叠
    practice first开张
    Linux下访问Windows的方法(原)
    Evolution收发Gmail邮件
  • 原文地址:https://www.cnblogs.com/pinxiong/p/13332609.html
Copyright © 2011-2022 走看看