一、需求发生场景
新碰到一个项目,该项目中有一些接口的供应商对接口的并发量有所限制,所以解决方案就是用了Java中的Semaphore信号量。
二、Semaphore信号量的理解
Semaphore是java.util.concurrent包下的一个类。从JDK1.5开始开始引入的,Semaphore是内部的维护了一组虚拟的许可,其许可的数量是通过改构造函数的参数来指定。
访问后端资源时,需要先获取此许可,获取此许可的方法acquire(),当资源使用完后,需要使用release方法释放许可。
Semaphore可以理解成,一个公共厕所,由于厕所中的坑位是有限的,坑位就相当于许可,当进去一个人时,占了一个坑位那么就相当于调用了一下acquire方法,当从厕所出来一个人时空出了一个坑位,就相当于调用了一次release方法,Semaphore的许可为0时类似厕所中的坑位已经被占满了,后面的请求就会被阻塞就类似外面的人在厕所外排队等着。
三、Semaphore的方法说明
acquire()
从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。acquire(int permits) 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。
acquireUninterruptibly() 从此信号量中获取许可,在有可用的许可前将其阻塞。
acquireUninterruptibly(int permits) 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。
availablePermits() 返回此信号量中当前可用的许可数。
drainPermits() 获取并返回立即可用的所有许可。
getQueueLength() 返回正在等待获取的线程的估计数目。
hasQueuedThreads() 查询是否有线程正在等待获取。
isFair() 如果此信号量的公平设置为 true,则返回
true
。release() 释放一个许可,将其返回给信号量。
release(int permits) 释放给定数目的许可,将其返回到信号量。
toString() 返回标识此信号量的字符串,以及信号量的状态。
tryAcquire() 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
tryAcquire(int permits) 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。
tryAcquire(int permits, long timeout, TimeUnit unit) 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
tryAcquire(long timeout, TimeUnit unit) 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。
四、代码示例
1 package com.ssc.Semaphore; 2 3 import java.util.concurrent.Semaphore; 4 import java.util.concurrent.TimeUnit; 5 6 public class SemaphoreDemo { 7 public static void main(String[] args){ 8 Runnable rester=new Runnable() { 9 //厕所里现在只有五个坑位 10 final Semaphore pitSemaphore=new Semaphore(5,true); 11 int count=1; 12 @Override 13 public void run() { 14 int time =(int)(Math.random()*10+2); 15 int num=count++; 16 try{ 17 //当前如厕者占用了一个坑 18 pitSemaphore.acquire(); 19 System.out.println("第"+num+"个如厕者正在蹲坑,需要时间"+time+"秒;"+pitSemaphore.toString()); 20 Thread.sleep(time*1000); 21 22 //判断厕所外是否有人再等待 23 if(pitSemaphore.hasQueuedThreads()){ 24 //打印等待人数 25 System.out.println("厕所外的等待人数"+pitSemaphore.getQueueLength()); 26 } 27 if(num==2){ 28 System.out.println("查看许可数"+pitSemaphore.drainPermits()+" "+pitSemaphore.availablePermits()); 29 //查看当前阻塞队列是否使用的时公平锁 30 System.out.println("查看信号量是否公平:"+pitSemaphore.isFair()); 31 } 32 //检查是否能获得许可 33 System.out.println("查看是否可获得许可"+pitSemaphore.tryAcquire()); 34 System.out.println("查看是否可获得许可"+pitSemaphore.tryAcquire(2,100, TimeUnit.MICROSECONDS)); 35 } catch (InterruptedException e) { 36 e.printStackTrace(); 37 }finally { 38 pitSemaphore.release(); 39 System.out.println("第"+num+"个如厕者出去了"); 40 } 41 } 42 }; 43 for (int i=1;i<10;i++){ 44 new Thread(rester).start(); 45 } 46 } 47 }
查看运行结果:
五、源码分析
1、创建信号量
构造函数的参数主要有两种,第一种默认的时非公平锁的信号量 Semaphore semaphore=new Semaphore(2); 此构造函数源码为:
另外一种是 Semaphore pitSemaphore=new Semaphore(5,true); 此构造函数的源码为
通过此源码可以看出当第二参数为true则采用公平锁,否则采用的是非公平锁
2、获取令牌
pitSemaphore.acquire();
对应的源码为:
1 /** 获取一个信号量**/ 2 public void acquire() throws InterruptedException { 3 sync.acquireSharedInterruptibly(1); 4 }
acquireSharedInterruptibly方法为:
1 public final void acquireSharedInterruptibly(int arg) 2 throws InterruptedException { 3 //判断当前线程是否被中断 4 if (Thread.interrupted()) 5 throw new InterruptedException(); 6 //尝试获得令牌,如果可用令牌不够申请的令牌数时arg,则会创建一个节点,将其加入阻塞队列,挂起当前线程 7 if (tryAcquireShared(arg) < 0) 8 doAcquireSharedInterruptibly(arg); 9 }
doAcquireSharedInterruptibly方法为:
1 /**当前线程获取共享资源失败后,会调用此方法**/ 2 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { 3 //通过调用addWaiter方法把线程封装成node节点,并将该节点设置成共享模式,将该节点加到队列的尾部 4 final Node node = addWaiter(Node.SHARED); 5 boolean failed = true; 6 try { 7 for (;;) { 8 //拿到节点的前驱 9 final Node p = node.predecessor(); 10 //如果该前驱是head,即证明该节点为第二位置,有资格去试图获取资源 11 if (p == head) { 12 //试图获取许可 13 int r = tryAcquireShared(arg); 14 if (r >= 0) { 15 //调用setHeadAndPropagate方法把该节点设置为新的头节点,同时唤醒队列中所有共享类型的节点,去获取共享资源。 16 setHeadAndPropagate(node, r); 17 p.next = null; // help GC 18 failed = false; 19 return; 20 } 21 } 22 //重组双向链表,清空无效节点,挂起当前线程 23 if (shouldParkAfterFailedAcquire(p, node) && 24 parkAndCheckInterrupt()) 25 throw new InterruptedException(); 26 } 27 } finally { 28 if (failed) 29 //取消获取动作 30 cancelAcquire(node); 31 } 32 }
3、释放令牌
pitSemaphore.release();
1 ublic void release() { 2 //未传值的情况下释放一个资源 3 sync.releaseShared(1); 4 }
1 public final boolean releaseShared(int arg) { 2 //获取共享模式资源释放,如果释放成功那么会调用doReleaseShared继续唤醒下一个节点 3 if (tryReleaseShared(arg)) { 4 doReleaseShared(); 5 return true; 6 } 7 return false; 8 }
1 private void doReleaseShared() { 2 for (;;) { 3 Node h = head; 4 if (h != null && h != tail) { 5 int ws = h.waitStatus; 6 //Node.SIGNAL的为-1 7 if (ws == Node.SIGNAL) { 8 //更新当前现成的状态值为0,如果失败则继续 9 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 10 continue; // loop to recheck cases 11 //unparkSuccessor()此方法是唤醒共享锁的第一个节点。如果本身头节点属于重置状态waitStatus==0,并且把它设置为传播状态那么就向下一个节点传播。 12 unparkSuccessor(h); 13 } 14 else if (ws == 0 && 15 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 16 continue; // loop on failed CAS 17 } 18 if (h == head) // loop if head changed 19 break; 20 } 21 }
主要参考资料:
Semaphore使用及原理
AQS共享锁模式