zoukankan      html  css  js  c++  java
  • 【JDK】JDK源码分析-Semaphore

    概述

    Semaphore 是并发包中的一个工具类,可理解为信号量。通常可以作为限流器使用,即限制访问某个资源的线程个数,比如用于限制连接池的连接数。

    打个通俗的比方,可以把 Semaphore 理解为一辆公交车:车上的座位数(初始的“许可” permits 数量)是固定的,行驶期间如果有人上车(获取许可),座位数(许可数量)就会减少,当人满的时候不能再继续上车了(获取许可失败);而有人下车(释放许可)后就空出了一些座位,其他人就可以继续上车了。

    下面具体分析其代码实现。

    代码分析

    Semaphore 的方法如下:

    其中主要方法是 acquire() 和 release() 相关的一系列方法,它们的作用类似。我们先从构造器开始分析。

    构造器

    private final Sync sync;
    
    // 初始化 Semaphore,传入指定的许可数量,非公平
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    
    // 初始化 Semaphore,传入指定的许可数量,指定是否公平
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

    构造器初始化了 Sync 变量,根据传入的 fair 值指定为 FairSync 或 NonFairSync,下面分析这三个类。

    内部嵌套类 Sync:

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        
        // 构造器,将父类 AQS 的 state 变量初始化为给定的 permits
        Sync(int permits) {
            setState(permits);
        }
    
        // 非公平方式尝试获取许可(减少 state 的值)
        final int nonfairTryAcquireShared(int acquires) {
            // 自旋操作
            for (;;) {
                // 获取许可值(state),并尝试 CAS 修改为减去后的结果
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    
        // 释放许可(增加 state 的值)
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                // 操作与获取类似,不同的在于此处是增加 state 值
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
        
        // 一些方法未给出...
    }

    可以看到 Sync 类继承自 AQS,并重写了 AQS 的 tryReleaseShared 方法,其中获取和释放许可分别对应的是对 AQS 中 state 值的减法和加法操作。具体可参考前文对 AQS 共享模式的分析「JDK源码分析-AbstractQueuedSynchronizer(3)」。

    NonFairSync (非公平版本实现):

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
    
        // 调用父类 Sync 的构造器来实现
        NonfairSync(int permits) {
            super(permits);
        }
        // 重写 AQS 的 tryAcquireShared 方法,代码实现在父类 Sync 中
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    FairSync (公平版本实现):

    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;
        
        // 构造器调用父类 Sync 的构造器来实现
        FairSync(int permits) {
            super(permits);
        }
        
        // 重写 AQS 的 tryAcquireShared 方法,尝试获取许可(permit)
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                // 若队列中有其他线程等待,则获取失败(这就是体现“公平”的地方)
                if (hasQueuedPredecessors())
                    return -1;
                // 获取当前的许可值
                int available = getState();
                // 计算剩余值
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

    PS: 体现“公平”的地方在于 tryAcquireShared 方法中,公平的版本会先判断队列中是否有其它线程在等待(hasQueuedPredecessors 方法)。

    主要方法的代码实现:

    // 获取一个许可(可中断)
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    // 获取一个许可(不响应中断)
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }
    
    // 尝试获取一个许可
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }
    
    // 尝试获取一个许可(有超时等待)
    public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    
    // 释放一个许可
    public void release() {
        sync.releaseShared(1);
    }

    还有一系列类似的操作,只不过获取/释放许可的数量可以指定:

    // 获取指定数量的许可(可中断)
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }
    
    // 获取指定数量的许可(不可中断)
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }
    
    // 尝试获取指定数量的许可
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }
    
    // 尝试获取指定数量的许可(有超时等待)
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }
    
    // 释放指定数量的许可
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

    可以看到,Semaphore 的主要方法都是在嵌套类 FairSync 和 NonFairSync 及其父类 Sync 中实现的,内部嵌套类也是 AQS 的典型用法。

    场景举例

    为了便于理解 Semaphore 的用法,下面简单举例分析(仅供参考):

    public class SemaphoreTest {
      public static void main(String[] args) {
        // 初始化 Semaphore
        // 这里的许可数为 2,即同时最多有 2 个线程可以获取到
        Semaphore semaphore = new Semaphore(2);
        for (int i = 0; i < 50; i++) {
          new Thread(() -> {
            try {
              // 获取许可
              semaphore.acquire();
              System.out.println(Thread.currentThread().getName() + " 正在执行..");
              // 模拟操作
              TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
              e.printStackTrace();
            } finally {
              // 释放许可
              semaphore.release();
            }
          }).start();
        }
      }
    }
    /*  执行结果(仅供参考):
        Thread-0 正在执行..
        Thread-1 正在执行..
        Thread-2 正在执行..
        Thread-3 正在执行..
        ...
    */

    这里把 Semaphore 的初始许可值设为 2,表示最多有两个线程可同时获取到许可(运行程序可发现线程是两两一起执行的)。设置为其他值也是类似的。

    比较特殊的是,如果把 Semaphore 的初始许可值设为 1,可以当做“互斥锁”来使用。

    小结

    Semaphore 是并发包中的一个工具类,其内部是基于 AQS 共享模式实现的。通常可以作为限流器使用,比如限定连接池等的大小。

    相关阅读:

    JDK源码分析-AbstractQueuedSynchronizer(3)

    Stay hungry, stay foolish.

    PS: 本文首发于微信公众号【WriteOnRead】。

  • 相关阅读:
    指针与数组
    深入函数
    到底是使用指针还是引用 ,混合使用以及易错点
    返回值作为标志
    c++的引用(二)
    内联函数
    c++的引用
    指针总结以及常量指针与指向常量的指针与指向常量的常指针
    c++中的 堆和栈
    Java Messages Synchronous and Asynchronous
  • 原文地址:https://www.cnblogs.com/jaxer/p/11331043.html
Copyright © 2011-2022 走看看