zoukankan      html  css  js  c++  java
  • Semaphore源码分析

    Semaphore可以控制并发数量,通过分发许可证的方式,acquire就是去获取许可证,如果获取到了的话,就可以执行,如果获取不到的话,就会去同步队列里阻塞.

    release会释放许可证并唤醒同步队列的线程.

    public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }

    默认是非公平,permits就是AQS中state的值

     public void acquire() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
    final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }

    可以看出就是通过减少state的值,如果剩下的值小于0的话,直接返回remaining,线程就会进入阻塞队列,大于0的话,就通过CAS改变state的值

    因为并发可能有多个线程来修改state的值,所以通过cas和自璇来保证线程安全

    再来看release方法

    public void release() {
            sync.releaseShared(1);
        }
     public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
     protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + releases;
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
                }
            }

    同样是通过cas的方式来归还许可证,并通过cas重新设置state的值

    private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }

    唤醒等待队列里的第一个节点

  • 相关阅读:
    micro-fusion & macro-fusion
    Intel Core Microarchitecture Pipeline
    Re-Order Buffer
    汇编效率优化:打破依赖链
    汇编效率优化:指令处理机制
    CABAC总结与补充讨论
    差分方程的零输入响应与零状态响应
    线性差分方程的迭代分析法
    二阶线性差分方程中的根/特征值的讨论
    VMware 快速克隆出多个 Linux centos7 环境
  • 原文地址:https://www.cnblogs.com/lzh66/p/13234128.html
Copyright © 2011-2022 走看看