zoukankan      html  css  js  c++  java
  • java学习之Semaphore信号量

    一、需求发生场景

                新碰到一个项目,该项目中有一些接口的供应商对接口的并发量有所限制,所以解决方案就是用了Java中的Semaphore信号量。

    二、Semaphore信号量的理解

          Semaphore是java.util.concurrent包下的一个类。从JDK1.5开始开始引入的,Semaphore是内部的维护了一组虚拟的许可,其许可的数量是通过改构造函数的参数来指定。

         访问后端资源时,需要先获取此许可,获取此许可的方法acquire(),当资源使用完后,需要使用release方法释放许可。

         Semaphore可以理解成,一个公共厕所,由于厕所中的坑位是有限的,坑位就相当于许可,当进去一个人时,占了一个坑位那么就相当于调用了一下acquire方法,当从厕所出来一个人时空出了一个坑位,就相当于调用了一次release方法,Semaphore的许可为0时类似厕所中的坑位已经被占满了,后面的请求就会被阻塞就类似外面的人在厕所外排队等着。

    三、Semaphore的方法说明

    • acquire()           从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断
    • acquire(int permits)       从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断
    • acquireUninterruptibly()     从此信号量中获取许可,在有可用的许可前将其阻塞。
    • acquireUninterruptibly(int permits)    从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。
    • availablePermits()      返回此信号量中当前可用的许可数。
    • drainPermits()         获取并返回立即可用的所有许可。
    • getQueueLength()        返回正在等待获取的线程的估计数目。
    • hasQueuedThreads()      查询是否有线程正在等待获取。
    • isFair()             如果此信号量的公平设置为 true,则返回 true
    • release()             释放一个许可,将其返回给信号量。
    • release(int permits)     释放给定数目的许可,将其返回到信号量。
    • toString()            返回标识此信号量的字符串,以及信号量的状态。
    • tryAcquire()           仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
    • tryAcquire(int permits)   仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。
    • tryAcquire(int permits, long timeout, TimeUnit unit)  如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
    • tryAcquire(long timeout, TimeUnit unit)  如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可。

    四、代码示例

     1 package com.ssc.Semaphore;
     2 
     3 import java.util.concurrent.Semaphore;
     4 import java.util.concurrent.TimeUnit;
     5 
     6 public class SemaphoreDemo {
     7     public static void main(String[] args){
     8         Runnable rester=new Runnable() {
     9             //厕所里现在只有五个坑位
    10             final Semaphore pitSemaphore=new Semaphore(5,true);
    11             int count=1;
    12             @Override
    13             public void run() {
    14                 int time =(int)(Math.random()*10+2);
    15                 int num=count++;
    16                 try{
    17                     //当前如厕者占用了一个坑
    18                     pitSemaphore.acquire();
    19                     System.out.println("第"+num+"个如厕者正在蹲坑,需要时间"+time+"秒;"+pitSemaphore.toString());
    20                     Thread.sleep(time*1000);
    21 
    22                     //判断厕所外是否有人再等待
    23                     if(pitSemaphore.hasQueuedThreads()){
    24                         //打印等待人数
    25                         System.out.println("厕所外的等待人数"+pitSemaphore.getQueueLength());
    26                     }
    27                     if(num==2){
    28                         System.out.println("查看许可数"+pitSemaphore.drainPermits()+"     "+pitSemaphore.availablePermits());
    29                         //查看当前阻塞队列是否使用的时公平锁
    30                         System.out.println("查看信号量是否公平:"+pitSemaphore.isFair());
    31                     }
    32                     //检查是否能获得许可
    33                     System.out.println("查看是否可获得许可"+pitSemaphore.tryAcquire());
    34                     System.out.println("查看是否可获得许可"+pitSemaphore.tryAcquire(2,100, TimeUnit.MICROSECONDS));
    35                 } catch (InterruptedException e) {
    36                     e.printStackTrace();
    37                 }finally {
    38                     pitSemaphore.release();
    39                     System.out.println("第"+num+"个如厕者出去了");
    40                 }
    41             }
    42         };
    43         for (int i=1;i<10;i++){
    44             new Thread(rester).start();
    45         }
    46     }
    47 }

    查看运行结果:

     五、源码分析

          1、创建信号量

           构造函数的参数主要有两种,第一种默认的时非公平锁的信号量  Semaphore semaphore=new Semaphore(2);   此构造函数源码为:

           

         另外一种是 Semaphore pitSemaphore=new Semaphore(5,true);  此构造函数的源码为

          通过此源码可以看出当第二参数为true则采用公平锁,否则采用的是非公平锁

              2、获取令牌

                        pitSemaphore.acquire();

                  对应的源码为:

    1 /** 获取一个信号量**/
    2 public void acquire() throws InterruptedException {
    3         sync.acquireSharedInterruptibly(1);
    4     }

            acquireSharedInterruptibly方法为:

    1     public final void acquireSharedInterruptibly(int arg)
    2             throws InterruptedException {
    3      //判断当前线程是否被中断
    4         if (Thread.interrupted())
    5             throw new InterruptedException();
    6 //尝试获得令牌,如果可用令牌不够申请的令牌数时arg,则会创建一个节点,将其加入阻塞队列,挂起当前线程
    7         if (tryAcquireShared(arg) < 0)
    8             doAcquireSharedInterruptibly(arg);
    9     }
    doAcquireSharedInterruptibly方法为:
     1 /**当前线程获取共享资源失败后,会调用此方法**/
     2     private void doAcquireSharedInterruptibly(int arg)  throws InterruptedException {
     3        //通过调用addWaiter方法把线程封装成node节点,并将该节点设置成共享模式,将该节点加到队列的尾部
     4         final Node node = addWaiter(Node.SHARED);
     5         boolean failed = true;
     6         try {
     7             for (;;) {
     8                //拿到节点的前驱
     9                 final Node p = node.predecessor();
    10               //如果该前驱是head,即证明该节点为第二位置,有资格去试图获取资源
    11                 if (p == head) {
    12                //试图获取许可
    13                     int r = tryAcquireShared(arg);
    14                     if (r >= 0) {
    15                   //调用setHeadAndPropagate方法把该节点设置为新的头节点,同时唤醒队列中所有共享类型的节点,去获取共享资源。
    16                         setHeadAndPropagate(node, r);
    17                         p.next = null; // help GC
    18                         failed = false;
    19                         return;
    20                     }
    21                 }
    22                //重组双向链表,清空无效节点,挂起当前线程
    23                 if (shouldParkAfterFailedAcquire(p, node) &&
    24                     parkAndCheckInterrupt())
    25                     throw new InterruptedException();
    26             }
    27         } finally {
    28             if (failed)
    29                 //取消获取动作
    30                 cancelAcquire(node);
    31         }
    32     }

    3、释放令牌

           pitSemaphore.release();

    1 ublic void release() {
    2        //未传值的情况下释放一个资源
    3         sync.releaseShared(1);
    4     }
    1  public final boolean releaseShared(int arg) {
    2 //获取共享模式资源释放,如果释放成功那么会调用doReleaseShared继续唤醒下一个节点
    3         if (tryReleaseShared(arg)) {
    4             doReleaseShared();
    5             return true;
    6         }
    7         return false;
    8     }
     1  private void doReleaseShared() {
     2         for (;;) {
     3             Node h = head;
     4             if (h != null && h != tail) {
     5                 int ws = h.waitStatus;
     6              //Node.SIGNAL的为-1
     7                 if (ws == Node.SIGNAL) {
     8 //更新当前现成的状态值为0,如果失败则继续
     9                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    10                         continue;            // loop to recheck cases
    11 //unparkSuccessor()此方法是唤醒共享锁的第一个节点。如果本身头节点属于重置状态waitStatus==0,并且把它设置为传播状态那么就向下一个节点传播。
    12                     unparkSuccessor(h);
    13                 }
    14                 else if (ws == 0 &&
    15                          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    16                     continue;                // loop on failed CAS
    17             }
    18             if (h == head)                   // loop if head changed
    19                 break;
    20         }
    21     }



    主要参考资料:
    Semaphore使用及原理
    AQS共享锁模式

      

    生于忧患,死于安乐
  • 相关阅读:
    Python文件基础
    Python字符串基础操作
    Python ===if while for语句 以及一个小小网络爬虫实例
    Python 常用函数大体分类
    Atlas安装及配置
    (转)VS无法启动调试:“生成下面的模块时,启用了优化或没有调试信息“
    (转)数据库中视图的作用
    (转)ASP.NET MVC 学习第一天
    (转)介绍几个C#正则表达式工具
    关于textbox.attributes["value"]的问题
  • 原文地址:https://www.cnblogs.com/songlove/p/15333552.html
Copyright © 2011-2022 走看看