概述
信号量(Semaphore)控制同时访问资源的线程数量,支持公平和非公平两种方式获取许可。
使用
提供的方法
1 public Semaphore(int permits) //permits为许可数,默认非公平方式 2 public Semaphore(int permits, boolean fair) 3 4 //获取一个许可。若获取成功,permits-1,直接返回;否则当前线程阻塞直到有permits被释放,除非线程被中断 5 //如果线程被中断,则抛出 InterruptedException,并且清除当前线程的已中断状态。 6 public void acquire() throws InterruptedException 7 //忽略中断 8 public void acquireUninterruptibly() 9 //尝试获取一个许可,成功返回true,否则返回false。 10 //即使已将此信号量设置为使用公平排序策略,但是调用 tryAcquire() 也将 立即获取许可(如果有一个可用),而不管当前是否有正在等待的线程。 11 public boolean tryAcquire() 12 //超时尝试获取一个许可,该方法遵循公平设置 13 public boolean tryAcquire(long timeout, TimeUnit unit) 14 //释放一个许可 15 public void release() 16 17 //以上方法都是获取或释放一个许可,每个方法都存在对应的获取或释放指定个数许可的方法。例如public boolean tryAcquire(int permits)
使用示例:
使用信号量实现对内容池(例如线程池)的访问。
1 class Pool { 2 private static final int MAX_AVAILABLE = 100; //许可数为100,在本例中也是内容池的item的个数。 3 private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); 4 5 //获取池中的一个item,首先从Semaphore获取许可;获取许可成功后,从池中获取一个可用的item,并把该item标记为已使用。 6 public Object getItem() throws InterruptedException { 7 available.acquire(); 8 return getNextAvailableItem(); 9 } 10 11 //将指定的item释放到池中,如果markAsUnused返回true,释放Semaphore的一个许可。 12 public void putItem(Object x) { 13 if (markAsUnused(x)) 14 available.release(); 15 } 16 17 // Not a particularly efficient data structure; just for demo 18 19 protected Object[] items = ... whatever kinds of items being managed //内容池,例如:连接池,每个item代表一个连接。 20 protected boolean[] used = new boolean[MAX_AVAILABLE]; //标记池中的每个item是否已经被占用 21 22 protected synchronized Object getNextAvailableItem() { 23 for (int i = 0; i < MAX_AVAILABLE; ++i) { 24 if (!used[i]) { 25 used[i] = true; 26 return items[i]; 27 } 28 } 29 return null; // not reached 30 } 31 32 protected synchronized boolean markAsUnused(Object item) { 33 for (int i = 0; i < MAX_AVAILABLE; ++i) { 34 if (item == items[i]) { 35 if (used[i]) { 36 used[i] = false; 37 return true; 38 } else 39 return false; 40 } 41 } 42 return false; 43 } 44 }
实现原理
基于AQS实现,用同步状态(state)表示许可数(permits),使用AQS的共享式获取和释放同步状态来实现permits的获取和释放。
域
1 private final Sync sync;
Sync是Semaphore的抽象内部类,继承了AQS。它有两个子类NonfairSync和FairSync,分别是非公平同步器和公平同步器。
Sync的源码:
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } //非公平共享式获取同步状态。自旋CAS获取同步状态直到成功或许可不足。 //返回值语义:负数代表获取失败、0代表获取成功但没有剩余资源、正数代表获取成功,还有剩余资源。 final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) //此处CAS时有可能同步队列中已有等待的线程,就导致的不公平性 return remaining; } } //自定义共享式释放同步状态。自旋CAS释放同步状态直到成功,除非overflow //返回值语义:true表示成功,不可能释放失败,除非overflow protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } }
NonfairSync(非公平)源码:
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
FairSync(公平)源码:
1 static final class FairSync extends Sync { 2 private static final long serialVersionUID = 2014338818796000944L; 3 4 FairSync(int permits) { 5 super(permits); 6 } 7 //自定义共享式释放同步状态。 8 protected int tryAcquireShared(int acquires) { 9 for (;;) { 10 //首先检查同步队列中是否有前驱。如果有则返回失败,将当前线程加入到同步队列的尾部,保证先尝试获取同步状态的线程先成功。 11 if (hasQueuedPredecessors()) 12 return -1; 13 int available = getState(); 14 int remaining = available - acquires; 15 if (remaining < 0 || 16 compareAndSetState(available, remaining)) 17 return remaining; 18 } 19 } 20 }
方法
除tryAcquire外,都是通过调用AQS提供的方法实现获取失败时的阻塞和唤醒机制,具体策略建AQS的源码。
1 //阻塞式获取一个许可,响应中断 2 public void acquire() throws InterruptedException { 3 sync.acquireSharedInterruptibly(1); //调用AQS提供的可响应中断共享式获取同步状态方法 4 } 5 6 //阻塞式获取一个许可,忽略中断 7 public void acquireUninterruptibly() { 8 sync.acquireShared(1); 9 } 10 11 //非阻塞式获取一个许可 12 public boolean tryAcquire() { 13 return sync.nonfairTryAcquireShared(1) >= 0; 14 } 15 16 //释放一个许可 17 public void release() { 18 sync.releaseShared(1); 19 }
参考资料
JDK DOC
《Java并发编程的艺术》