zoukankan      html  css  js  c++  java
  • Semaphore、CountDownLatch、CyclicBarrier、Exchanger、Phaser源码

    Semaphore

    Semaphore最主要功能特点,是控制同时访问特定资源的线程数量。

    其源码实现就是在初始化时传入permits参数,紧接着调用AQS同步器setState方法,将内存中state值设为permits传入的参数值;

    而当要同时访问的线程数, 大于state的值,即remaining小于0时,线程加入同步器中队列进行等待。

    public class Semaphore implements java.io.Serializable {
        abstract static class Sync extends AbstractQueuedSynchronizer {
            Sync(int permits) {
                setState(permits);
            }
        }
        
        public Semaphore(int permits, boolean fair) {
            sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }
        
        static final class NonfairSync extends Sync {
            NonfairSync(int permits) {
                super(permits);
            }
        }
    }
    

    remaining = available - acquires
    当acquire传入的permits值小于等于state值,更新state值为当前remaining的值
    当acquire传入的值大于state的值,即remaining小于0,走doAcquireSharedInterruptibly逻辑,加入等待队列。

    public class Semaphore implements java.io.Serializable {
        public void acquire(int permits) throws InterruptedException {
            if (permits < 0) throw new IllegalArgumentException();
            sync.acquireSharedInterruptibly(permits);
        }
        
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
        
        //非公平锁
        static final class NonfairSync extends Sync {
            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
        }
        
        abstract static class Sync extends AbstractQueuedSynchronizer {
           final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
        }
    }
    
    CountDownLatch

    CountDownLatch功能特点是一个或多个线程等待其他线程完成操作。

    而源码实现是,每当一个线程完成操作,state值减1;最后当所有线程全部完成操作,state==0,tryAcquire返回1,跳出同步器中doAcquireSharedInterruptibly方法中的死循环,进行下一步。

    CountDownLatch的初始设置state值的逻辑同上Semaphore

    public class CountDownLatch {
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
        
        private static final class Sync extends AbstractQueuedSynchronizer {
            Sync(int count) {
                初始化state值
                setState(count);
            }
        }
    }
    

    await方法每次同步器进行state减1

    public class CountDownLatch {
        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
        
        private static final class Sync extends AbstractQueuedSynchronizer {
            protected int tryAcquireShared(int acquires) {
                // state==0时,跳出死循环
                return (getState() == 0) ? 1 : -1;
            }
        }
    }
    
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                //只要有一个未到达,就执行里面循环获取锁
                doAcquireSharedInterruptibly(arg);
        }
        
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        //当前置节点是头节点,也即所有都到达,获取锁资源
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    }
    
    CyclicBarrier

    CyclicBarrier等待所有线程都到达屏障,屏障才开门,所有被屏障拦截的线程才会继续运行。功能类似于CountDownLatch,但可以重置计数器、增加barrierAction方法等功能,可以比CountDownLatch实现更复杂的业务。

    源码实现是通过在dowait方法中用ReentrantLock加锁解锁实现,每次调用await方法,进行加锁,同时count值进行减1,当所有线程都到达屏障,如果有barrierAction方法,执行barrierAction方法

    而当超时等条件都满足后进行unlock,否则进行等待陷入死循环中。

        public CyclicBarrier(int parties) {
            this(parties, null);
        }
    
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    
        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
        
        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final Generation g = generation;
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
                // 线程计数
                int index = --count;
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        //所有线程都到达屏障,如果有action,执行action
                        if (command != null)
                            command.run();
                        ranAction = true;
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                for (;;) {
                    try {
                        if (!timed)
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    
    Exchanger

    Exchanger用于进行线程间的数据交换,它提供一个同步点(第一个线程等待第二个线程也执行exchange方法),两个线程可以交换彼此的数据。

    Exchanger里面写了个内部类Participant,Participant继承ThreadLocal,每个线程数据保存在ThreadLocal中。

    更详细源码解释可参考:https://blog.csdn.net/wzl1369248650/article/details/104263283

        public Exchanger() {
            participant = new Participant();
        }
        
        static final class Participant extends ThreadLocal<Node> {
            public Node initialValue() { return new Node(); }
        }
        
        @sun.misc.Contended static final class Node {
            int index;              // Arena index
            int bound;              // Last recorded value of Exchanger.bound
            int collides;           // Number of CAS failures at current bound
            int hash;               // Pseudo-random for spins
            Object item;            // This thread's current item
            volatile Object match;  // Item provided by releasing thread
            volatile Thread parked; // Set to this thread when parked, else null
        }
        
        public V exchange(V x, long timeout, TimeUnit unit)
            throws InterruptedException, TimeoutException {
            Object v;
            Object item = (x == null) ? NULL_ITEM : x;
            long ns = unit.toNanos(timeout);
            if ((arena != null ||
                // 数据交换入口
                 (v = slotExchange(item, true, ns)) == null) &&
                ((Thread.interrupted() ||
                  (v = arenaExchange(item, true, ns)) == null)))
                throw new InterruptedException();
            if (v == TIMED_OUT)
                throw new TimeoutException();
            return (v == NULL_ITEM) ? null : (V)v;
        }
    
    Phaser

    参考:https://www.cnblogs.com/tong-yuan/p/11614755.html

    Phaser相对于CyclicBarrier和CountDownLatch的优势:

    1. Phaser可以完成多阶段,而一个CyclicBarrier或者CountDownLatch一般只能控制一到两个阶段的任务;

    2. Phaser可动态注册屏障数,每个阶段的任务数量可以控制,而一个CyclicBarrier或者CountDownLatch任务数量一旦确定不可修改。

    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出。
  • 相关阅读:
    (转)单机上配置hadoop
    整数划分 Integer Partition(二)
    整数划分 Integer Partition(一)
    深入理解计算机系统:信息的处理和表示(二)整数四则运算
    深入理解计算机系统:信息的处理与表示(一)基础
    从《营造法式》为何成书于北宋 谈起
    (转)排列算法 Permutation Generation
    洛谷2971 [USACO10HOL]牛的政治Cow Politics
    洛谷1549 棋盘问题(2)
    洛谷3084 [USACO13OPEN]照片Photo
  • 原文地址:https://www.cnblogs.com/caozibiao/p/14155081.html
Copyright © 2011-2022 走看看