zoukankan      html  css  js  c++  java
  • Java并发编程--Semaphore

    概述

      信号量(Semaphore)控制同时访问资源的线程数量,支持公平和非公平两种方式获取许可。

    使用

      提供的方法

     1 public Semaphore(int permits)    //permits为许可数,默认非公平方式
     2 public Semaphore(int permits, boolean fair)
     3 
     4 //获取一个许可。若获取成功,permits-1,直接返回;否则当前线程阻塞直到有permits被释放,除非线程被中断
     5 //如果线程被中断,则抛出 InterruptedException,并且清除当前线程的已中断状态。 
     6 public void acquire() throws InterruptedException
     7 //忽略中断
     8 public void acquireUninterruptibly()
     9 //尝试获取一个许可,成功返回true,否则返回false。
    10 //即使已将此信号量设置为使用公平排序策略,但是调用 tryAcquire() 也将 立即获取许可(如果有一个可用),而不管当前是否有正在等待的线程。
    11 public boolean tryAcquire()
    12 //超时尝试获取一个许可,该方法遵循公平设置
    13 public boolean tryAcquire(long timeout, TimeUnit unit)
    14 //释放一个许可
    15 public void release()
    16 
    17 //以上方法都是获取或释放一个许可,每个方法都存在对应的获取或释放指定个数许可的方法。例如public boolean tryAcquire(int permits)

      使用示例:

        使用信号量实现对内容池(例如线程池)的访问。

     1 class Pool {
     2     private static final int MAX_AVAILABLE = 100;    //许可数为100,在本例中也是内容池的item的个数。
     3     private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
     4 
     5     //获取池中的一个item,首先从Semaphore获取许可;获取许可成功后,从池中获取一个可用的item,并把该item标记为已使用。
     6     public Object getItem() throws InterruptedException {
     7         available.acquire();
     8         return getNextAvailableItem();
     9     }
    10 
    11     //将指定的item释放到池中,如果markAsUnused返回true,释放Semaphore的一个许可。
    12     public void putItem(Object x) {
    13         if (markAsUnused(x))
    14             available.release();
    15     }
    16 
    17     // Not a particularly efficient data structure; just for demo
    18 
    19     protected Object[] items = ... whatever kinds of items being managed    //内容池,例如:连接池,每个item代表一个连接。
    20     protected boolean[] used = new boolean[MAX_AVAILABLE];    //标记池中的每个item是否已经被占用
    21 
    22     protected synchronized Object getNextAvailableItem() {
    23         for (int i = 0; i < MAX_AVAILABLE; ++i) {
    24             if (!used[i]) {
    25                 used[i] = true;
    26                 return items[i];
    27             }
    28         }
    29         return null; // not reached
    30     }
    31 
    32     protected synchronized boolean markAsUnused(Object item) {
    33         for (int i = 0; i < MAX_AVAILABLE; ++i) {
    34             if (item == items[i]) {
    35                 if (used[i]) {
    36                     used[i] = false;
    37                     return true;
    38                 } else
    39                     return false;
    40             }
    41         }
    42         return false;
    43     }
    44 }

    实现原理

      基于AQS实现,用同步状态(state)表示许可数(permits),使用AQS的共享式获取和释放同步状态来实现permits的获取和释放。

      域

    1 private final Sync sync;

        Sync是Semaphore的抽象内部类,继承了AQS。它有两个子类NonfairSync和FairSync,分别是非公平同步器和公平同步器。

        Sync的源码:

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
    
        Sync(int permits) {
            setState(permits);
        }
    
        final int getPermits() {
            return getState();
        }
        
        //非公平共享式获取同步状态。自旋CAS获取同步状态直到成功或许可不足。
        //返回值语义:负数代表获取失败、0代表获取成功但没有剩余资源、正数代表获取成功,还有剩余资源。
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining)) //此处CAS时有可能同步队列中已有等待的线程,就导致的不公平性
                    return remaining;
            }
        }
        
        //自定义共享式释放同步状态。自旋CAS释放同步状态直到成功,除非overflow
        //返回值语义:true表示成功,不可能释放失败,除非overflow
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
    
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
    
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

        NonfairSync(非公平)源码:

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
    
        NonfairSync(int permits) {
            super(permits);
        }
    
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

        FairSync(公平)源码:

     1 static final class FairSync extends Sync {
     2     private static final long serialVersionUID = 2014338818796000944L;
     3 
     4     FairSync(int permits) {
     5         super(permits);
     6     }
     7     //自定义共享式释放同步状态。
     8     protected int tryAcquireShared(int acquires) {
     9         for (;;) {
    10             //首先检查同步队列中是否有前驱。如果有则返回失败,将当前线程加入到同步队列的尾部,保证先尝试获取同步状态的线程先成功。
    11             if (hasQueuedPredecessors())
    12                 return -1;
    13             int available = getState();
    14             int remaining = available - acquires;
    15             if (remaining < 0 ||
    16                 compareAndSetState(available, remaining))
    17                 return remaining;
    18         }
    19     }
    20 }

      方法

        除tryAcquire外,都是通过调用AQS提供的方法实现获取失败时的阻塞和唤醒机制,具体策略建AQS的源码。

     1 //阻塞式获取一个许可,响应中断
     2 public void acquire() throws InterruptedException {
     3     sync.acquireSharedInterruptibly(1);    //调用AQS提供的可响应中断共享式获取同步状态方法
     4 }
     5 
     6 //阻塞式获取一个许可,忽略中断
     7 public void acquireUninterruptibly() {
     8     sync.acquireShared(1);
     9 }
    10 
    11 //非阻塞式获取一个许可
    12 public boolean tryAcquire() {
    13     return sync.nonfairTryAcquireShared(1) >= 0;
    14 }
    15 
    16 //释放一个许可
    17 public void release() {
    18     sync.releaseShared(1);
    19 }

    参考资料

      JDK DOC

      《Java并发编程的艺术》

  • 相关阅读:
    关于PowerShell调用Linq的一组实验
    PowerShell创建参考窗口
    Python切图脚本
    11->8
    用Python演奏音乐
    关于Haskell计算斐波那契数列的思考
    傅立叶变换与小波分析
    堆排序(python实现)
    二进制数据表示方式
    oracle数据插入/查询乱码
  • 原文地址:https://www.cnblogs.com/zaizhoumo/p/7787090.html
Copyright © 2011-2022 走看看