Semaphore可以控制并发数量,通过分发许可证的方式,acquire就是去获取许可证,如果获取到了的话,就可以执行,如果获取不到的话,就会去同步队列里阻塞.
release会释放许可证并唤醒同步队列的线程.
public Semaphore(int permits) { sync = new NonfairSync(permits); }
默认是非公平,permits就是AQS中state的值
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
可以看出就是通过减少state的值,如果剩下的值小于0的话,直接返回remaining,线程就会进入阻塞队列,大于0的话,就通过CAS改变state的值
因为并发可能有多个线程来修改state的值,所以通过cas和自璇来保证线程安全
再来看release方法
public void release() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
同样是通过cas的方式来归还许可证,并通过cas重新设置state的值
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
唤醒等待队列里的第一个节点