正常的锁在任何时刻都只允许一个任务访问一项资源,而计数信号量允许n个任务同时访问这个资源。
一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire()
,然后再获取该许可。每个 release()
添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore
只对可用许可的号码进行计数,并采取相应的行动。
Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。例如,下面的类使用信号量控制对内容池的访问:
class Pool { private static final int MAX_AVAILABLE = 100; private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); public Object getItem() throws InterruptedException { available.acquire(); return getNextAvailableItem(); } public void putItem(Object x) { if (markAsUnused(x)) available.release(); } // Not a particularly efficient data structure; just for demo protected Object[] items = ... whatever kinds of items being managed protected boolean[] used = new boolean[MAX_AVAILABLE]; protected synchronized Object getNextAvailableItem() { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true; return items[i]; } } return null; // not reached } protected synchronized boolean markAsUnused(Object item) { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false; return true; } else return false; } } return false; } }
获得一项前,每个线程必须从信号量获取许可,从而保证可以使用该项。该线程结束后,将项返回到池中并将许可返回到该信号量,从而允许其他线程获取该项。注意,调用 acquire()
时无法保持同步锁,因为这会阻止将项返回到池中。信号量封装所需的同步,以限制对池的访问,这同维持该池本身一致性所需的同步是分开的。
将信号量初始化为 1,使得它在使用时最多只有一个可用的许可,从而可用作一个相互排斥的锁。这通常也称为二进制信号量,因为它只能有两种状态:一个可用的许可,或零个可用的许可。按此方式使用时,二进制信号量具有某种属性(与很多 Lock
实现不同),即可以由线程释放“锁”,而不是由所有者(因为信号量没有所有权的概念)。在某些专门的上下文(如死锁恢复)中这会很有用。
此类的构造方法可选地接受一个公平 参数。当设置为 false 时,此类不对线程获取许可的顺序做任何保证。特别地,闯入 是允许的,也就是说可以在已经等待的线程前为调用 acquire()
的线程分配一个许可,从逻辑上说,就是新线程将自己置于等待线程队列的头部。当公平设置为 true 时,信号量保证对于任何调用获取
方法的线程而言,都按照处理它们调用这些方法的顺序(即先进先出;FIFO)来选择线程、获得许可。注意,FIFO 排序必然应用到这些方法内的指定内部执行点。所以,可能某个线程先于另一个线程调用了 acquire
,但是却在该线程之后到达排序点,并且从方法返回时也类似。还要注意,非同步的 tryAcquire
方法不使用公平设置,而是使用任意可用的许可。
通常,应该将用于控制资源访问的信号量初始化为公平的,以确保所有线程都可访问资源。为其他的种类的同步控制使用信号量时,非公平排序的吞吐量优势通常要比公平考虑更为重要。
此类还提供便捷的方法来同时 acquire
和释放
多个许可。小心,在未将公平设置为 true 时使用这些方法会增加不确定延期的风险。
package tij; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; /** * Created by huaox on 2017/4/2. * */ class Pool<T>{ private int size; private List<T> items = new ArrayList<T>(); private volatile boolean[] checkedOut;//被签出的对象 private Semaphore semaphore; Pool(Class<T> tClass,int size) { this.size = size; checkedOut = new boolean[size]; semaphore = new Semaphore(size, true); for (int i = 0; i < size; i++) { try { items.add(tClass.newInstance()); } catch (IllegalAccessException | InstantiationException e) { e.printStackTrace(); } } } T checkOut() throws InterruptedException { semaphore.acquire();//在许可可用前将被阻塞,直到许可可用 return getItem(); } void checkIin(T item){ if (realease(item))// semaphore.release(); } synchronized T getItem(){ for (int i = 0; i < size; i++) if(!checkedOut[i]){ checkedOut[i]=true; return items.get(i); } return null; } synchronized boolean realease(T item){ int index = items.indexOf(item); if(index<0) return false; if(checkedOut[index]){ checkedOut[index]=false; return true; } return false; } } class Flat{ private volatile double d ; private static int count = 0; private final int id = count++; public Flat() { for (int i = 0; i < 10000; i++) { d+=(Math.E+Math.PI)/(double) i; } //System.out.println("creating tij.Flat object "+id+" "); } public String toString() { return "tij.Flat id "+id; } } class CheckOutTask<T> implements Runnable{ private static int count = 0; private final int id = count++; private Pool<T> pool; CheckOutTask(Pool<T> pool){ this.pool = pool; } public void run() { try { T item = pool.checkOut(); System.out.println(this + " is checking out "+ item+" "); TimeUnit.SECONDS.sleep(2); pool.checkIin(item); System.out.println(this + " is checking in "+ item+" "); }catch (InterruptedException e) { e.printStackTrace(); } } public String toString() { return "tij.CheckOutTask " + id; } } public class Test5 { private final static int SIZE = 5; public static void main(String[] args) throws Exception{ final Pool<Flat> pool = new Pool<>(Flat.class,SIZE); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < SIZE; i++) executorService.execute(new CheckOutTask<Flat>(pool)); System.out.println("All checkoutTask created"); List<Flat> list =new ArrayList<Flat>(); for (int i = 0; i < SIZE; i++) { Flat flat = pool.checkOut(); System.out.println("main "+i+" is checking out "+flat); list.add(flat); } Future<?> future = executorService.submit(new Runnable() { @Override public void run() { try { Flat flat = pool.checkOut(); System.out.println("another is checking "+flat); } catch (InterruptedException e) { System.out.println("InterruptedException form checkout"); } } }); TimeUnit.SECONDS.sleep(3); future.cancel(true); System.out.println("checking in objects in"+list); for (Flat flat : list) pool.checkIin(flat); executorService.shutdown(); } }
输出结果:
All checkoutTask created tij.CheckOutTask 0 is checking out tij.Flat id 0 tij.CheckOutTask 1 is checking out tij.Flat id 1 main 0 is checking out tij.Flat id 2 main 1 is checking out tij.Flat id 3 main 2 is checking out tij.Flat id 4 tij.CheckOutTask 1 is checking in tij.Flat id 1 tij.CheckOutTask 2 is checking out tij.Flat id 1 main 3 is checking out tij.Flat id 0 tij.CheckOutTask 0 is checking in tij.Flat id 0 tij.CheckOutTask 3 is checking out tij.Flat id 1 tij.CheckOutTask 2 is checking in tij.Flat id 1 tij.CheckOutTask 4 is checking out tij.Flat id 1 tij.CheckOutTask 3 is checking in tij.Flat id 1 tij.CheckOutTask 4 is checking in tij.Flat id 1 main 4 is checking out tij.Flat id 1 InterruptedException form checkout checking in objects in[tij.Flat id 2, tij.Flat id 3, tij.Flat id 4, tij.Flat id 0, tij.Flat id 1] Process finished with exit code 0