zoukankan      html  css  js  c++  java
  • JAVA并发-Semaphore

    案例

    /**
     * @author qhong
     * @date 2019/12/13 10:59
     * https://www.cnblogs.com/dolphin0520/p/3920397.html
     * 信号量
     * 一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用
     **/
    public class SemaphoreDemo {
    	public static void main(String[] args) {
    		int N = 8;            //工人数
    		Semaphore semaphore = new Semaphore(5); //机器数目
    		for(int i=0;i<N;i++)
    			new Worker(i,semaphore).start();
    	}
    
    	static class Worker extends Thread{
    		private int num;
    		private Semaphore semaphore;
    		public Worker(int num,Semaphore semaphore){
    			this.num = num;
    			this.semaphore = semaphore;
    		}
    
    		@Override
    		public void run() {
    			try {
    				semaphore.acquire();
    				System.out.println("工人"+this.num+"占用一个机器在生产...");
    				Thread.sleep(2000);
    				System.out.println("工人"+this.num+"释放出机器");
    				semaphore.release();
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    
    

    output:

    工人0占用一个机器在生产...
    工人2占用一个机器在生产...
    工人1占用一个机器在生产...
    工人3占用一个机器在生产...
    工人5占用一个机器在生产...
    工人0释放出机器
    工人3释放出机器
    工人1释放出机器
    工人2释放出机器
    工人6占用一个机器在生产...
    工人4占用一个机器在生产...
    工人7占用一个机器在生产...
    工人5释放出机器
    工人6释放出机器
    工人4释放出机器
    工人7释放出机器
    

    原理分析

    Semaphore与ReentrantLock的内部类的结构相同,类内部总共存在Sync、NonfairSync、FairSync三个类,NonfairSync与FairSync类继承自Sync类,Sync类继承自AbstractQueuedSynchronizer抽象类。

    Sync

       final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    
            protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + releases;
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
                }
            }
    

    这里就记录Sync的两个方法

    NonfairSync

    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -2694183684443567898L;
    
            NonfairSync(int permits) {
                super(permits);
            }
    
            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
        }
    

    FairSync

     static final class FairSync extends Sync {
            private static final long serialVersionUID = 2014338818796000944L;
    
            FairSync(int permits) {
                super(permits);
            }
    
            protected int tryAcquireShared(int acquires) {
                for (;;) {
                    if (hasQueuedPredecessors())
                        return -1;
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
        }
    

    操作流程分析

    初始化

    直接修改state为具体许可数

    acquire(获取许可)

    信号量获取一个或多个许可,在未获取前,线程会一直阻塞

       public void acquire(int permits) throws InterruptedException {
            if (permits < 0) throw new IllegalArgumentException();
            sync.acquireSharedInterruptibly(permits);
        }
        
          public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    

    tryAcquireShared

    分FairSync与NonfairSync

    FairSync:

      protected int tryAcquireShared(int acquires) {
                for (;;) {
                    //这里多了一步判断,是否存在应该先于当前线程获得锁的线程
                    if (hasQueuedPredecessors())
                        return -1;
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    

    这里再说一下hasQueuePredecessors

    public final boolean hasQueuedPredecessors() {
            Node t = tail;  //尾节点
            Node h = head;  //头节点
            Node s;
    
            //头节点 != 尾节点
            //同步队列第一个节点不为null
            //当前线程是同步队列第一个节点
            return h != t &&
                    ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    

    我们发现,公平锁与非公平锁对tryAcquireShared方法的实现的唯一区别就是公平锁首先会判断是否存在应该先于当前线程获得锁的线程,如果存在说明当前线程不是下一个应该获取锁的线程。hasQueuedPredecessors方法主要确认以下几种情况:

    • 等待队列为空
    • 当前线程所在的节点是头结点
    • 当前线程所在的节点是头结点的后继节点

    NonfairSync:

       final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    
    

    这里代码很明了,就是如果remaining为负数,直接返回,反之就设置state为remaining

    doAcquireSharedInterruptibly

    此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。

        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
             // 添加共享锁类型节点到队列中
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        // 再次尝试获取共享锁
                        int r = tryAcquireShared(arg);
                        // 如果在这里成功获取共享锁,会进入共享锁唤醒逻辑
                        if (r >= 0) {
                            // 共享锁唤醒逻辑
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    // 与独占锁相同的挂起逻辑
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    addWaiter(Node.SHARED),这里将会新增两个node

    第一轮循环创建一个new Node(),空节点,线程也为空

    第二轮将Node.SHARED加入到队列中,prev指向head

    在最后一次release之前,tryAcquireShared会为-1

    shouldParkAfterFailedAcquire

       private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    parkAndCheckInterrupt(阻塞线程)

        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    
    

    release(释放线程)

    release

      public void release(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            sync.releaseShared(permits);
        }
    

    releaseShared

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {// 尝试释放资源
            doReleaseShared();// 唤醒后继结点
            return true;
        }
        return false;
    }
    

    此方法的流程也比较简单,一句话:释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。例如,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值。

    tryReleaseShared

        protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + releases;
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
                }
            }
    

    doReleaseShared

    此方法主要用于唤醒后继。下面是它的源码:

       private void doReleaseShared() {
        /*
         * 如果head需要通知下一个节点,调用unparkSuccessor
         * 如果不需要通知,需要在释放后把waitStatus改为PROPAGATE来继续传播
         * 此外,我们必须通过自旋来CAS以防止操作时有新节点加入
         * 另外,不同于其他unparkSuccessor的用途,我们需要知道CAS设置状态失败的情况,
         * 以便进行重新检查。
         */
        for (;;) {
            //唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了
            //其实就是唤醒上面新获取到共享锁的节点的后继节点
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //表示后继节点需要被唤醒
                if (ws == Node.SIGNAL) {
                    //这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;           
                    //执行唤醒操作
                    unparkSuccessor(h);
                }
                //如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
                else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;               
            }
            //如果头结点没有发生变化,表示设置完成,退出循环
            //如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试
            if (h == head)                   
                break;
        }
    }
    

    释放锁继续acquire运行

    setHeadAndPropagate

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;//记录当前头节点
        //设置新的头节点,即把当前获取到锁的节点设置为头节点
        //注:这里是获取到锁之后的操作,不需要并发控制
        setHead(node);
        //这里意思有两种情况是需要执行唤醒操作
        //1.propagate > 0 表示调用方指明了后继节点需要被唤醒
        //2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            //如果当前节点的后继节点是共享类型或者没有后继节点,则进行唤醒
            //这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
    

    此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式!这样,形成了一个唤醒链,直到写锁的节点就停止。

    doReleaseShared

    private void doReleaseShared() {
      for (;;) {
        // 从头节点开始执行唤醒操作
        // 这里需要注意,如果从setHeadAndPropagate方法调用该方法,那么这里的head是新的头节点
        Node h = head;
        if (h != null && h != tail) {
          int ws = h.waitStatus;
          //表示后继节点需要被唤醒
          if (ws == Node.SIGNAL) {
            // 初始化节点状态
            //这里需要CAS原子操作,因为setHeadAndPropagate和releaseShared这两个方法都会顶用doReleaseShared,避免多次unpark唤醒操作
            if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
              // 如果初始化节点状态失败,继续循环执行
              continue;            // loop to recheck cases
            // 执行唤醒操作
            unparkSuccessor(h);
          }
          //如果后继节点暂时不需要唤醒,那么当前头节点状态更新为PROPAGATE,确保后续可以传递给后继节点
          else if (ws == 0 &&
                   !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
            continue;                // loop on failed CAS
        }
        // 如果在唤醒的过程中头节点没有更改,退出循环
        // 这里防止其它线程又设置了头节点,说明其它线程获取了共享锁,会继续循环操作
        if (h == head)                   // loop if head changed
          break;
      }
    }
    

    共享锁的释放锁逻辑比独占锁的释放锁逻辑稍微复杂,原因是共享锁需要释放队列中所有共享类型的节点,因此需要循环操作,由于释放锁过程中会涉及多个地方修改节点状态,此时需要 CAS 原子操作来并发安全。

    总结

    非公平与公平

    非公平Semaphore信号量对于任何申请许可的线程来说,都是第一时间看是否有多余的许可,如果有则给此线程,如果没有则进队列排队等待,而不是此线程直接进AQS队列排队等待按顺序来拿到许可,利用此间隙来分配许可可以提高并发量。但是会引发一个问题:越活跃的线程越能够拿到许可,造成“饥渴死”现象。

    图1(setHeadAndPropagate)

    有一个方法很重要:setHeadAndPropagate。它除了重新标记head指向的节点外,还有一个重要的作用,那就是propagate(传递),也就是共享的意思。

    用图举个例子:


    因为线程B的读锁无法直接获得锁,所以需要在Sync队列中等待,导致后面其他线程的读锁都得等待。

    当线程A的读锁释放后,线程B的写锁获得锁,当它释放后,线程B的读锁会获取到锁,并传递给后面的节点,传递的事情就是在setHeadAndPropagate里做的,我们来看看它是如何做的。

    图2

    参考:

    【java并发核心一】Semaphore 的使用思路

    Java多线程进阶(二十)—— J.U.C之synchronizer框架:Semaphore

    【JUC】JDK1.8源码分析之Semaphore(六)

    Semaphore源码解析

    Java并发之AQS源码分析(二)

    【细谈Java并发】谈谈AQS

  • 相关阅读:
    js-link下载文件
    sql-优化建议
    Studio-环境变量设置
    Studio
    Docker下安装ElasticSearch和Kibana
    sklearn 中的 r2_score
    R语言将所有列数据正交化/缩放
    R语言 random forests out-of-bag prediction
    R语言 coalesce 函数
    R语言 case_when 函数
  • 原文地址:https://www.cnblogs.com/hongdada/p/12091324.html
Copyright © 2011-2022 走看看