zoukankan      html  css  js  c++  java
  • 聊聊高并发(二十五)解析java.util.concurrent各个组件(七) 理解Semaphore

    前几篇分析了一下AQS的原理和实现。这篇拿Semaphore信号量做样例看看AQS实际是怎样使用的。


    Semaphore表示了一种能够同一时候有多个线程进入临界区的同步器,它维护了一个状态表示可用的票据,仅仅有拿到了票据的线程尽能够进入临界区,否则就等待。直到获得释放出的票据。

    Semaphore经常使用在资源池中来管理资源。当状态仅仅有1个0两个值时,它退化成了一个相互排斥的同步器。类似锁。


    以下来看看Semaphore的代码。

    它维护了一个内部类Sync来继承AQS,定制tryXXX方法来使用AQS。

    我们之前提到过AQS支持独占和共享两种模式,Semaphore明显就是共享模式。它支持多个线程能够同一时候进入临界区。所以Sync扩展了Shared相关的方法。

    能够看到Sync的主要操作都是对状态的无锁改动,它不须要处理AQS队列相关的操作。在聊聊高并发(二十四)解析java.util.concurrent各个组件(六) 深入理解AQS(四) 我们说了AQS提供了tryXXX接口给子类扩展,相当于给子类一个机会,能够自己处理状态,决定是否入同步队列。


    1. nonfailTryAcquireShared()非公平的tryAcquire,它立马改动了票据状态,而不须要管是否有先来的线程正在等待,而一旦有可用的票据,就直接获得了锁,不须要进入AQS的队列等待同步。

    2. tryReleaseShared()方法负责释放共享状态的资源,它仅仅改动了票据状态。由AQS的releaseShared()方法来负责唤醒在AQS队列等待的线程

    3. reducePermits()和drainPermits()方法都是直接改动了状态,从而限制可用的资源

    abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 1192457210091910933L;
    
            Sync(int permits) {
                setState(permits);
            }
    
            final int getPermits() {
                return getState();
            }
    
            final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    
            protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + releases;
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
                }
            }
    
            final void reducePermits(int reductions) {
                for (;;) {
                    int current = getState();
                    int next = current - reductions;
                    if (next > current) // underflow
                        throw new Error("Permit count underflow");
                    if (compareAndSetState(current, next))
                        return;
                }
            }
    
            final int drainPermits() {
                for (;;) {
                    int current = getState();
                    if (current == 0 || compareAndSetState(current, 0))
                        return current;
                }
            }
        }

    Sync也是一个抽象类,详细的实现是NonfailSync和FairSync。代表了非公平实现和公平实现。在上一篇已经提到,所谓的非公平仅仅是说在获取资源时开了一个口子。能够让后来的线程不须要管在AQS队列中的先来的线程来获取资源。而一旦获取失败,就得进入AQS队列等待,而AQS队列是先来先服务的FIFO队列。

    能够看到,NonfailSync和FairSync仅仅是在tryAcquireShared方法的实现上不同,其它都是一样的。

    /**
         * NonFair version
         */
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -2694183684443567898L;
    
            NonfairSync(int permits) {
                super(permits);
            }
    
            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
        }
    
        /**
         * Fair version
         */
        static final class FairSync extends Sync {
            private static final long serialVersionUID = 2014338818796000944L;
    
            FairSync(int permits) {
                super(permits);
            }
    
            protected int tryAcquireShared(int acquires) {
                for (;;) {
                    if (hasQueuedPredecessors())
                        return -1;
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
        }

    再来看看Semaphore自己提供的方法,

    1.支持可中断和不可中断的获取/释放

    2.支持限时获取

    3.支持tryXX获取/释放

    4. 支持同一时候获取/释放多个资源


    能够看到Semaphore的实现都是基于AQS的方法来作的,单个资源的获取/释放操作都是请求1个资源,所以參数传递的是1,多个资源获取传递了一个int个数。

    public void acquire() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
    public void acquireUninterruptibly() {
            sync.acquireShared(1);
        }
    
    public boolean tryAcquire() {
            return sync.nonfairTryAcquireShared(1) >= 0;
        }
    
    public boolean tryAcquire(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
    public void release() {
            sync.releaseShared(1);
        }
    
    public void acquire(int permits) throws InterruptedException {
            if (permits < 0) throw new IllegalArgumentException();
            sync.acquireSharedInterruptibly(permits);
        }
    
    public void acquireUninterruptibly(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            sync.acquireShared(permits);
        }
    
    public boolean tryAcquire(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            return sync.nonfairTryAcquireShared(permits) >= 0;
        }
    
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
            throws InterruptedException {
            if (permits < 0) throw new IllegalArgumentException();
            return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
        }
    
    public void release(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            sync.releaseShared(permits);
        }
    
    

    以下用一个实例来測试一下Semaphore的功能。

    1. 创建一个有两个票据的Semaphore

    2. 创建20个线程来竞争运行race()方法

    3. 在race()方法里先打印一句等待获取资源的话,再获取资源,获得资源后打印一句话,最后释放资源,释放资源前打印一句话

    package com.lock.test;
    
    import java.util.concurrent.Semaphore;
    
    public class SemaphoreUsecase {
    	private Semaphore semaphore = new Semaphore(2);
    	
    	public void race(){
    		System.out.println("Thread " + Thread.currentThread().getName() + " is waiting the resource");
    		semaphore.acquireUninterruptibly();
    		try{
    			System.out.println("Thread " + Thread.currentThread().getName() + " got the resource");
    			try {
    				Thread.sleep(3000);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}finally{
    			System.out.println("Thread " + Thread.currentThread().getName() + " is releasing the resource");
    			semaphore.release();
    		}
    	}
    	
    	public static void main(String[] args){
    		final SemaphoreUsecase usecase = new SemaphoreUsecase();
    		
    		for(int i = 0; i < 10; i++){
    			Thread t = new Thread(new Runnable(){
    
    				@Override
    				public void run() {
    					usecase.race();
    				}
    				
    			}, String.valueOf(i));
    			t.start();
    		}
    	}
    }
    

    測试结果:

    能够看到先来的两个线程先获得了资源,后来的线程都在等待,当有线程释放资源之后,等待的线程才会去获得资源,直到都获得/释放资源

    Thread 0 is waiting the resource
    Thread 0 got the resource
    Thread 2 is waiting the resource
    Thread 2 got the resource
    Thread 1 is waiting the resource
    Thread 4 is waiting the resource
    Thread 3 is waiting the resource
    Thread 5 is waiting the resource
    Thread 6 is waiting the resource
    Thread 7 is waiting the resource
    Thread 8 is waiting the resource
    Thread 9 is waiting the resource
    Thread 2 is releasing the resource
    Thread 0 is releasing the resource
    Thread 1 got the resource
    Thread 4 got the resource
    Thread 1 is releasing the resource
    Thread 4 is releasing the resource
    Thread 3 got the resource
    Thread 5 got the resource
    Thread 3 is releasing the resource
    Thread 5 is releasing the resource
    Thread 6 got the resource
    Thread 7 got the resource
    Thread 7 is releasing the resource
    Thread 6 is releasing the resource
    Thread 8 got the resource
    Thread 9 got the resource
    Thread 8 is releasing the resource
    Thread 9 is releasing the resource
    








  • 相关阅读:
    DripRoad(点滴之路)
    如何写优雅的代码
    .Net 一直在改变
    Protobufnet的完美解决方案
    关于msgpack序列化后的消息包是否再压缩
    失眠
    创建一个比微软性能更好空间更少的GUID
    msgpack与protobuf的简单性能测试对比
    分布式游戏服务器的登陆流程
    对象池的实现与性能测试
  • 原文地址:https://www.cnblogs.com/lcchuguo/p/5177299.html
Copyright © 2011-2022 走看看