zoukankan      html  css  js  c++  java
  • 并发编程Semaphore详解

    Semaphore的作用:限制线程并发的数量

    位于 java.util.concurrent 下,

    构造方法

        // 构造函数  代表同一时间,最多允许permits执行acquire() 和release() 之间的代码。
        public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
    
        //False:表示非公平信号量,即线程启动的顺序与调用semaphore.acquire() 的顺序无关,也就是线程先启动了并不代表先获得 许可
       //True:公平信号量,即线程启动的顺序与调用semaphore.acquire() 的顺序有关,也就是先启动的线程优先获得许可
        public Semaphore(int permits, boolean fair) {
            sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }

    1. 方法acquire(n) 的功能是每调用1次此方法,就消耗掉n个许可。
    2. 方法release(n) 的功能是每调用1次此方法,就动态添加n个许可。
    3. 方法acquireUnnterruptibly()作用是是等待进入acquire() 方法的线程不允许被中断。
    4. 方法availablePermits() 返回Semaphore对象中当前可以用的许可数。
    5. 方法drainPermits() 获取并返回所有的许可个数,并且将可用的许可重置为0
    6. 方法 getQueueLength() 的作用是取得等待的许可的线程个数
    7. 方法 hasQueueThreads() 的作用是判断有没有线程在等待这个许可
    8. 方法tryAcquire() 的作用是尝试获取1个许可。如果获取不到则返回false,通常与if语句结合使用,其具有无阻塞的特点。无阻塞的特点可以使不至于在同步处于一直持续等待的状态。
    9. 方法tryAcquire(n) 的作用是尝试获取n个许可,如果获取不到则返回false
    10. 方法tryAcquire(long timeout,TimeUnit unit)的作用是在指定的时间内尝试获取1个许可,如果获取不到则返回false
    11. 方法tryAcquire(int permits,long timeout,TimeUnit unit) 的作用是在指定的时间内尝试获取n 个许可,如果获取不到则返回false

    使用:

    多进路-多处理-多出路:允许多个线程同时处理任务

        private static Semaphore semaphore = new Semaphore(4);
    
        public static void main(String[] args) {
            for (int i = 0; i < 10; i++) {
                new Thread(new Runnable(){
                    @Override
                    public void run(){
                        System.out.println(Thread.currentThread().getName()+"  正在运行");
                        try {
                            semaphore.acquire();// //请求获得许可,如果有可获得的许可则继续往下执行,许可数减1。否则进入阻塞状态
                            System.out.println(Thread.currentThread().getName() + "进入,并发数:" + (4-semaphore.availablePermits()));
                            Thread.sleep(3000);
                            System.out.println(Thread.currentThread().getName() +"即将离开,并发数:" + (4-semaphore.availablePermits()));
                            semaphore.release();//释放许可,许可数加1
                        }catch( Exception e){
                            e.printStackTrace();
                        } finally {
                        }
                    }
                }).start();
            }
        }

    运行结果:

    Thread-1  正在运行
    Thread-6  正在运行
    Thread-7  正在运行
    Thread-5  正在运行
    Thread-3  正在运行
    Thread-2  正在运行
    Thread-4  正在运行
    Thread-0  正在运行
    Thread-5进入,并发数:4
    Thread-9  正在运行
    Thread-7进入,并发数:3
    Thread-8  正在运行
    Thread-6进入,并发数:2
    Thread-1进入,并发数:1
    Thread-7即将离开,并发数:4
    Thread-5即将离开,并发数:4
    Thread-1即将离开,并发数:4
    Thread-2进入,并发数:4
    Thread-6即将离开,并发数:4
    Thread-4进入,并发数:4
    Thread-0进入,并发数:4
    Thread-3进入,并发数:3
    Thread-3即将离开,并发数:4
    Thread-4即将离开,并发数:4
    Thread-0即将离开,并发数:4
    Thread-2即将离开,并发数:4
    Thread-8进入,并发数:4
    Thread-9进入,并发数:4
    Thread-8即将离开,并发数:2
    Thread-9即将离开,并发数:2

    当使用:公平锁时:  private static Semaphore semaphore = new Semaphore(num,true);

    仔细看结果,就会发现先运行的线程会优先处理

    Thread-1  正在运行
    Thread-5  正在运行
    Thread-5进入,并发数:2
    Thread-4  正在运行
    Thread-4进入,并发数:3
    Thread-2  正在运行
    Thread-2进入,并发数:4
    Thread-3  正在运行
    Thread-0  正在运行
    Thread-9  正在运行
    Thread-8  正在运行
    Thread-7  正在运行
    Thread-6  正在运行
    Thread-1进入,并发数:1
    Thread-4即将离开,并发数:4
    Thread-5即将离开,并发数:4
    Thread-3进入,并发数:4
    Thread-2即将离开,并发数:4
    Thread-0进入,并发数:4
    Thread-9进入,并发数:4
    Thread-1即将离开,并发数:3
    Thread-8进入,并发数:4
    Thread-3即将离开,并发数:4
    Thread-7进入,并发数:4
    Thread-9即将离开,并发数:4
    Thread-8即将离开,并发数:4
    Thread-0即将离开,并发数:4
    Thread-6进入,并发数:4
    Thread-7即将离开,并发数:2
    Thread-6即将离开,并发数:1
    View Code

    2、多进路-单处理-多出路:允许多个线程同时处理任务,但是顺序却是同步的,也就是阻塞的。所以也称单处理。

    在代码中加入ReentrantLock对象 ,或者使用synchronized 代码块,保存代码的同步性

    或者  上面代码  private static Semaphore semaphore = new Semaphore(1);

    运行结果:

    Thread-0  正在运行
    Thread-7  正在运行
    Thread-6  正在运行
    Thread-2  正在运行
    Thread-4  正在运行
    Thread-3  正在运行
    Thread-5  正在运行
    Thread-1  正在运行
    Thread-9  正在运行
    Thread-0进入,并发数:1
    Thread-8  正在运行
    Thread-0即将离开,并发数:1
    Thread-7进入,并发数:1
    Thread-7即将离开,并发数:1
    Thread-6进入,并发数:1
    Thread-6即将离开,并发数:1
    Thread-2进入,并发数:1
    Thread-2即将离开,并发数:1
    Thread-4进入,并发数:1
    Thread-4即将离开,并发数:1
    Thread-3进入,并发数:1
    Thread-3即将离开,并发数:1
    Thread-5进入,并发数:1
    Thread-5即将离开,并发数:1
    Thread-1进入,并发数:1
    Thread-1即将离开,并发数:1
    Thread-9进入,并发数:1
    Thread-9即将离开,并发数:1
    Thread-8进入,并发数:1
    Thread-8即将离开,并发数:1

    此时,实现了互斥锁的功能。

    源码解析

    Semaphore内部基于AQS的共享模式,所以实现都委托给了Sync类。
    这里就看一下NonfairSync的构造方法:

    NonfairSync(int permits) {
         super(permits);
    }

    可以看到直接调用了父类的构造方法,Sync的构造方法如下:

    Sync(int permits) {
         setState(permits);
    }

    可以看到调用了setState方法,也就是说AQS中的资源就是许可证的数量。

    获取许可

    先从获取一个许可看起,并且先看非公平模式下的实现。首先看acquire方法,acquire方法有几个重载,但主要是下面这个方法

    public void acquire(int permits) throws InterruptedException {
            if (permits < 0) throw new IllegalArgumentException();
            sync.acquireSharedInterruptibly(permits);
        }

    从上面可以看到,调用了Sync的acquireSharedInterruptibly方法,该方法在父类AQS中,如下:

    public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            //如果线程被中断了,抛出异常
            if (Thread.interrupted())
                throw new InterruptedException();
            //获取许可失败,将线程加入到等待队列中
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }

    AQS子类如果要使用共享模式的话,需要实现tryAcquireShared方法,下面看NonfairSync的该方法实现:

    protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }

    该方法调用了父类中的nonfairTyAcquireShared方法,如下:

    final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    //获取剩余许可数量
                    int available = getState();
                    //计算给完这次许可数量后的个数
                    int remaining = available - acquires;
                    //如果许可不够或者可以将许可数量重置的话,返回
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }

    从上面可以看到,只有在许可不够时返回值才会小于0,其余返回的都是剩余许可数量,这也就解释了,一旦许可不够,后面的线程将会阻塞。看完了非公平的获取,再看下公平的获取,代码如下:

    protected int tryAcquireShared(int acquires) {
                for (;;) {
                    //如果前面有线程再等待,直接返回-1
                    if (hasQueuedPredecessors())
                        return -1;
                    //后面与非公平一样
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }

    从上面可以看到,FairSync与NonFairSync的区别就在于会首先判断当前队列中有没有线程在等待,如果有,就老老实实进入到等待队列;而不像NonfairSync一样首先试一把,说不定就恰好获得了一个许可,这样就可以插队了。
    看完了获取许可后,再看一下释放许可。

    释放许可

    释放许可也有几个重载方法,但都会调用下面这个带参数的方法,

    public void release(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            sync.releaseShared(permits);
        }

    releaseShared方法在AQS中,如下:

    public final boolean releaseShared(int arg) {
            //如果改变许可数量成功
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }

    AQS子类实现共享模式的类需要实现tryReleaseShared类来判断是否释放成功,实现如下:

    protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    //获取当前许可数量
                    int current = getState();
                    //计算回收后的数量
                    int next = current + releases;
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    //CAS改变许可数量成功,返回true
                    if (compareAndSetState(current, next))
                        return true;
                }
            }

    从上面可以看到,一旦CAS改变许可数量成功,那么就会调用doReleaseShared()方法释放阻塞的线

    减小许可数量

    Semaphore还有减小许可数量的方法,该方法可以用于用于当资源用完不能再用时,这时就可以减小许可证。代码如下:

    protected void reducePermits(int reduction) {
            if (reduction < 0) throw new IllegalArgumentException();
            sync.reducePermits(reduction);
        }

    可以看到,委托给了Sync,Sync的reducePermits方法如下:

     final void reducePermits(int reductions) {
                for (;;) {
                    //得到当前剩余许可数量
                    int current = getState();
                    //得到减完之后的许可数量
                    int next = current - reductions;
                    if (next > current) // underflow
                        throw new Error("Permit count underflow");
                    //如果CAS改变成功
                    if (compareAndSetState(current, next))
                        return;
                }
            }

    从上面可以看到,就是CAS改变AQS中的state变量,因为该变量代表许可证的数量。

    获取剩余许可数量

    Semaphore还可以一次将剩余的许可数量全部取走,该方法是drain方法,如下:

    public int drainPermits() {
            return sync.drainPermits();
        }

    Sync的实现如下:

    final int drainPermits() {
                for (;;) {
                    int current = getState();
                    if (current == 0 || compareAndSetState(current, 0))
                        return current;
                }
            }

    可以看到,就是CAS将许可数量置为0。

    总结

    Semaphore是信号量,用于管理一组资源。其内部是基于AQS的共享模式,AQS的状态表示许可证的数量,在许可证数量不够时,线程将会被挂起;而一旦有一个线程释放一个资源,那么就有可能重新唤醒等待队列中的线程继续执行。

    参考: https://www.cnblogs.com/wujiaofen/p/11356436.html

    ————————————————
    版权声明:本文为CSDN博主「xingfeng_coder」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qq_19431333/article/details/70212663

  • 相关阅读:
    虚拟化技术
    软件产业的知识经济 (蔡学墉)
    关于内存对齐
    Reverse Engineering
    [转]今天的操作系统 
    BasicBIOS & CMOS
    [bbk5355]第18集 Chapter 08 Handling Exceptions(01)
    [bbk1452]第1集 在Apache中使用SSL
    Linux>User Manager
    如何更新linux系统时间
  • 原文地址:https://www.cnblogs.com/yrjns/p/12199271.html
Copyright © 2011-2022 走看看