1. 使用指南
package com.multthread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class SemaphoreTest { //由于信号量是计数器递增,初始值可以随便设置 static volatile Semaphore sh = new Semaphore(2); public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newFixedThreadPool(2); // 将任务A加入线程池 es.submit(()->{ try { System.out.println("t1..."); sh.release(); }catch (Exception e){} }); // 将任务B加入线程池 es.submit(()->{ try { System.out.println("t2..."); sh.release(); }catch (Exception e){} }); // 等待子线程执行完release方法返回,注意这里release可以是同一个线程执行,只要调用了两次就行 // 此函数入参=当初始信号计数+调用次数时,才会放行,同时将计数器state重置为0 sh.acquire(4); // 将任务C加入到线程池 es.submit(()->{ try { Thread.sleep(100); System.out.println("t1..."); sh.release(); }catch (Exception e){} }); // 将任务D加入到线程池 es.submit(()->{ try { System.out.println("t2..."); sh.release(); }catch (Exception e){} }); //由于state被重置为0了,所有所以这里入参写调用次数 sh.acquire(2); System.out.println("main....."); es.shutdown(); } }
2.
基于AQS实现,与CountDownLatch不同的是,Semaphore内部的计数器是递增的。初始化的时候可以执行一个计数器的值,但是需要在需要同步的地方调用acquire方法执行需要同步的线程数。并且,内部的AQS实现(sync)获取信号量有公平策略和非公平策略之分。
3. 源码分析
- 构造函数
// 构造函数,默认采用非公平策略 public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
- release函数
public void release() { sync.releaseShared(1); } public final boolean releaseShared(int arg) {
// 尝试释放资源 if (tryReleaseShared(arg)) {
// 资源释放成功则调用park方法唤醒aqs队列里面最先挂起的线程 doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int releases) {
// CAS循环修改state值,直到修改成功 for (;;) {
// 获取当前的信号量值 int current = getState();
// 信号量值加releases,即+1 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded");
// 使用CAS更新state的值 if (compareAndSetState(current, next)) return true; } } // 释放资源完毕,调用唤醒挂起线程 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
- acquire方法
public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 尝试获取 if (tryAcquireShared(arg) < 0)
// 如果获取失败则加入到阻塞队列,然后再次尝试,如果失败则调用park方法挂起当前线程 doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }