zoukankan      html  css  js  c++  java
  • Semaphore 源码分析

    Semaphore

    Semaphore 是基于同步器实现的计数信号量。
    Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
    公平的信号量可以保证不会出现线程饥饿,而非公平的信号量可以提供更高的吞吐量。
    

    创建实例

        private final Sync sync;
    
        /**
         *  信号量的同步器实现
         */
        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 1192457210091910933L;
    
            Sync(int permits) {
                // 写入许可数
                setState(permits);
            }
    
            final int getPermits() {
                // 获取可用许可数
                return getState();
            }
    
            /**
             *  非公平地获取 acquires 个许可
             * created by ZXD at 15 Dec 2018 T 11:43:17
             * @param acquires
             * @return
             */
            final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    // 读取可用许可数
                    final int available = getState();
                    // 计算剩余许可数
                    final int remaining = available - acquires;
                    /**
                     * 1)剩余许可数 < 0,则直接返回,不更新可用许可数
                     * 2)更新可用许可书
                     */
                    if (remaining < 0 ||
                            compareAndSetState(available, remaining)) {
                        return remaining;
                    }
                }
            }
    
            /**
             *  尝试释放 releases 个许可
             * created by ZXD at 15 Dec 2018 T 11:44:56
             * @param releases
             * @return
             */
            @Override
            protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    final int current = getState();
                    // 计算可用许可数
                    final int next = current + releases;
                    if (next < current) {
                        throw new Error("Maximum permit count exceeded");
                    }
                    // 更新许可数
                    if (compareAndSetState(current, next)) {
                        return true;
                    }
                }
            }
    
            /**
             *  递减 reductions 个许可
             * created by ZXD at 15 Dec 2018 T 11:46:19
             * @param reductions
             */
            final void reducePermits(int reductions) {
                for (;;) {
                    final int current = getState();
                    final int next = current - reductions;
                    if (next > current) {
                        throw new Error("Permit count underflow");
                    }
                    if (compareAndSetState(current, next)) {
                        return;
                    }
                }
            }
    
            /**
             *  一次性获取全部许可
             * created by ZXD at 15 Dec 2018 T 11:46:41
             * @return
             */
            final int drainPermits() {
                for (;;) {
                    // 读取当前许可数
                    final int current = getState();
                    // 如果不是 0,则将其置为 0
                    if (current == 0 || compareAndSetState(current, 0)) {
                        // 返回读取到的许可数
                        return current;
                    }
                }
            }
        }
    
        /**
         *  非公平版本
         */
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -2694183684443567898L;
    
            NonfairSync(int permits) {
                super(permits);
            }
    
            @Override
            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
        }
    
        /**
         *  公平版本
         */
        static final class FairSync extends Sync {
            private static final long serialVersionUID = 2014338818796000944L;
    
            FairSync(int permits) {
                super(permits);
            }
    
            @Override
            protected int tryAcquireShared(int acquires) {
                for (;;) {
                    // 如果已经有线程在阻塞等待获取许可,则不允许获取
                    if (hasQueuedPredecessors()) {
                        return -1;
                    }
                    final int available = getState();
                    final int remaining = available - acquires;
                    if (remaining < 0 ||
                            compareAndSetState(available, remaining)) {
                        return remaining;
                    }
                }
            }
        }
    
        /**
         *  创建一个持有 permits 个许可的非公平信号量
         */
        public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
    
        /**
         * 创建一个持有 permits 个许可的
         * true:公平信号量
         * false:公平信号量
         */
        public Semaphore(int permits, boolean fair) {
            sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }
    

    获取许可

    • 尝试获取一个许可,如果无许可可用,则阻塞等待,支持中断
        /**
         *  尝试获取一个许可,如果无许可可用,则阻塞等待
         *  1)获取到一个许可
         *  2)线程被中断
         */
        public void acquire() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
    • 尝试获取 permits 个许可,如果无许可可用,则阻塞等待,支持中断
        /**
         *  尝试获取 permits 个许可,如果无许可可用,则阻塞等待
         *  1)获取到一个许可
         *  2)线程被中断
         */
        public void acquire(int permits) throws InterruptedException {
            if (permits < 0) {
                throw new IllegalArgumentException();
            }
            sync.acquireSharedInterruptibly(permits);
        }
    
    • 尝试获取一个许可,如果无许可可用,则阻塞等待,不支持线程中断
        /**
         *  尝试获取一个许可,如果无许可可用,则阻塞等待,不支持线程中断
         *  1)获取到一个许可
         */
        public void acquireUninterruptibly() {
            sync.acquireShared(1);
        }
    
    • 尝试获取 permits 个许可,如果无许可可用,则阻塞等待,不支持中断
        /**
         *  尝试获取 permits 个许可,如果无许可可用,则阻塞等待,不支持中断
         *  1)获取到一个许可
         */
        public void acquireUninterruptibly(int permits) {
            if (permits < 0) {
                throw new IllegalArgumentException();
            }
            sync.acquireShared(permits);
        }
    
    • 如果有许可可用,则一次性获取所有的许可,并返回许可数,否则返回 0
        /**
         *  如果有许可可用,则一次性获取所有的许可,并返回许可数,否则返回 0
         */
        public int drainPermits() {
            return sync.drainPermits();
        }
    

    释放许可

    • 将一个许可归还给信号量
        /**
         *  将一个许可归还给信号量
         */
        public void release() {
            sync.releaseShared(1);
        }
    
    • 将 permits 个许可归还给信号量
        /**
         * 将 permits 个许可归还给信号量
         */
        public void release(int permits) {
            if (permits < 0) {
                throw new IllegalArgumentException();
            }
            sync.releaseShared(permits);
        }
    
  • 相关阅读:
    git提交本地代码到远程服务器
    报错 D:Program Files odejs ode_cache\_logs2019-05-07T07_07_30_992Z-debug.log
    vue项目中使用插件将字符串装化为格式化的json数据(可伸缩)
    odoo官方文档第二章 Data Files
    odoo官方文档第一章 ORM
    odoo模块的创建 openacademy学习笔记
    mysql存储过程的学习(二)
    mysql存储过程的学习(一)
    linux 进入mysql的常用命令(转)
    Dubbo入门学习(转)
  • 原文地址:https://www.cnblogs.com/zhuxudong/p/10122899.html
Copyright © 2011-2022 走看看