zoukankan      html  css  js  c++  java
  • J.U.C-三剑客[semaphoreCyclicBarrierCountDownLatch]

    一.semaphore信号量,底层也是基于AQS
    使用:
    /**
    * 可以理解为控制某个资源最多有多少个线程同时执行,(比如洗手间,并行与排队)
    * 如果满了只能等待直到其它资源释放(可以理解为并发量控制)
    * @author Binglong
    * @date 2018-11-12
    */
    public class SemaphoreUtils {
        public static void main(String[] args) {
            final int SH_SIZE = 10;
            Semaphore semaphore = new Semaphore(SH_SIZE);
            final int TH_NUM = 20;
            for (int i = 0; i < TH_NUM; i++) {
                ThreadPoolUtils.getSingle().threadPoolDo(new TaskSemaphore(semaphore));
            }
        }
    }
     
    class TaskSemaphore implements Runnable {
        private Semaphore semaphore;
     
        TaskSemaphore(Semaphore semaphore) {
            this.semaphore = semaphore;
        }
     
        public void run() {
            String threadName = Thread.currentThread().getName();
            try {
                this.semaphore.acquire();
                System.out.println(threadName + ":occupy...");
                Thread.sleep(new Random().nextInt(10000));
                System.out.println(threadName + ":over...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
        //注意一点要放到finally
                semaphore.release();
            }
        }
    }
     

    源码

    package java.util.concurrent;
    import java.util.*;
    import java.util.concurrent.locks.*;
    import java.util.concurrent.atomic.*;
    
    public class Semaphore implements java.io.Serializable {
        private static final long serialVersionUID = -3222578661600680210L;
        
        //定义一个内部类
        private final Sync sync;
    
        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 1192457210091910933L;
    
            Sync(int permits) {
                setState(permits);
            }
    
            final int getPermits() {
                return getState();
            }
    
            final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    
            protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + releases;
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
                }
            }
    
            final void reducePermits(int reductions) {
                for (;;) {
                    int current = getState();
                    int next = current - reductions;
                    if (next > current) // underflow
                        throw new Error("Permit count underflow");
                    if (compareAndSetState(current, next))
                        return;
                }
            }
    
            final int drainPermits() {
                for (;;) {
                    int current = getState();
                    if (current == 0 || compareAndSetState(current, 0))
                        return current;
                }
            }
        }
    
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -2694183684443567898L;
    
            NonfairSync(int permits) {
                super(permits);
            }
    
            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
        }
    
        static final class FairSync extends Sync {
            private static final long serialVersionUID = 2014338818796000944L;
    
            FairSync(int permits) {
                super(permits);
            }
    
            protected int tryAcquireShared(int acquires) {
                for (;;) {
                    if (hasQueuedPredecessors())
                        return -1;
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
        }
    
        public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
    
        public Semaphore(int permits, boolean fair) {
            sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }
        
        public void acquire() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        public void acquireUninterruptibly() {
            sync.acquireShared(1);
        }
    
        public boolean tryAcquire() {
            return sync.nonfairTryAcquireShared(1) >= 0;
        }
        
        public boolean tryAcquire(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
        public void release() {
            sync.releaseShared(1);
        }
        
        public void acquire(int permits) throws InterruptedException {
            if (permits < 0) throw new IllegalArgumentException();
            sync.acquireSharedInterruptibly(permits);
        }
        
        public void acquireUninterruptibly(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            sync.acquireShared(permits);
        }
    
        public boolean tryAcquire(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            return sync.nonfairTryAcquireShared(permits) >= 0;
        }
    
        public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
            throws InterruptedException {
            if (permits < 0) throw new IllegalArgumentException();
            return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
        }
    
        public void release(int permits) {
            if (permits < 0) throw new IllegalArgumentException();
            sync.releaseShared(permits);
        }
    
        public int availablePermits() {
            return sync.getPermits();
        }
    
        public int drainPermits() {
            return sync.drainPermits();
        }
    
        protected void reducePermits(int reduction) {
            if (reduction < 0) throw new IllegalArgumentException();
            sync.reducePermits(reduction);
        }
    
        public boolean isFair() {
            return sync instanceof FairSync;
        }
    
        public final boolean hasQueuedThreads() {
            return sync.hasQueuedThreads();
        }
    
        public final int getQueueLength() {
            return sync.getQueueLength();
        }
    
        protected Collection<Thread> getQueuedThreads() {
            return sync.getQueuedThreads();
        }
    
        public String toString() {
            return super.toString() + "[Permits = " + sync.getPermits() + "]";
        }
    }
    二、CyclicBarrier
    使用:
    public static void main(String[] args) {
       //创建一个CyclicBarrier并在主线程上new一个任务
    final int N = 5; final CyclicBarrier cyclic = new CyclicBarrier(N, new Runnable() { public void run() {
            //主线程任务(等最后一个线程做完)
    try { System.out.println("汇总计算开始"); Thread.sleep(Math.abs(10)); System.out.println("汇总计算完成"); } catch (Exception e) { e.printStackTrace(); } } }); for (int i = 0; i < N; i++) { final int t = i; new Thread(new Runnable() { public void run() {
             //每个线程任务做完等待(主线程做完才继续往下走)
    try { System.out.println(t + "中心数据已计算开始"); Thread.sleep(Math.abs(new Random().nextInt() % 10000)); System.out.println(t + "中心数据已计算结束"); cyclic.await(); System.out.println(t + "中心数据退出"); } catch (Exception e) { e.printStackTrace(); } } }).start(); } } 0中心数据已计算开始 3中心数据已计算开始 4中心数据已计算开始 2中心数据已计算开始 4中心数据已计算结束 1中心数据已计算开始 1中心数据已计算结束 3中心数据已计算结束 2中心数据已计算结束 0中心数据已计算结束 汇总计算开始 汇总计算完成 0中心数据退出 1中心数据退出 4中心数据退出 2中心数据退出 3中心数据退出
    源码:
    1.构造方法
    
    //只是做等待parties个线程(没有主线程任务)
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    //等待parties个线程后,先完成barrierAction的run方法,其它线程继续执行
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
     
    package java.util.concurrent;
    import java.util.concurrent.locks.*;
    
    
    public class CyclicBarrier {
    
        private static class Generation {
            boolean broken = false;
        }
    
        /** The lock for guarding barrier entry */
        private final ReentrantLock lock = new ReentrantLock();
        /** Condition to wait on until tripped */
        private final Condition trip = lock.newCondition();
        /** The number of parties */
        private final int parties;
        /* The command to run when tripped */
        private final Runnable barrierCommand;
        /** The current generation */
        private Generation generation = new Generation();
    
        private int count;
    
        private void nextGeneration() {
            // signal completion of last generation
            trip.signalAll();
            // set up next generation
            count = parties;
            generation = new Generation();
        }
    
        private void breakBarrier() {
            generation.broken = true;
            count = parties;
            trip.signalAll();
        }
    
        private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
         //获取锁 lock.lock();
    try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); } public int getParties() { return parties; } public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } } public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } } public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } } }
     
     
     
    2.重要方法
    a.wait()方法,当调用wait()方法的线程数量,达到CyclicBarrier构造方法的N时,(CyclicBarrier在构造方法的Runnable barrierAction,方法完成后,当前线程继续执行)
    在CyclicBarrier上等待的线程数量达到parties,则所有线程被释放,继续执行。
    当前线程被中断,则抛出InterruptedException异常,并停止等待,继续执行。
    当前线程等待超时,则抛出TimeoutException异常,并停止等待,继续执行。
    其他等待的线程被中断,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
    其他等待的线程超时,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
    其他线程调用CyclicBarrier.reset()方法,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行
    public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException,TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
     
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
     
    b.getParties()获取CyclicBarrier等待的线程数,也就是CyclicBarrier构造方法参数parties的值
     
    c.getNumberWaiting() how many thread wait now
     
    d.rest()
    如果有正在等待的线程,则会抛出BrokenBarrierException异常,且这些线程停止等待,继续执行。
    将是否破损标志位broken置为false。
     
    三、CountDownLatch
    使用:
    /**
    * countDownLatch.countDown()调用一次减一,到0时,其它await方法继续往下执行
    * 可以做并发开关(把SIZE设置为1,通过主线程来countDown(),其它线程都调用await()方法)
    * @author Binglong
    * @date 2018-11-12
    */
    public class CountDownLatchUtils {
        public static void main(String[] args) throws InterruptedException {
            final int SIZE = 20;
            CountDownLatch countDownLatch = new CountDownLatch(SIZE);
            for (int i = 0; i < SIZE; i++) {
                ThreadPoolUtils.getSingle().threadPoolDo(new TaskCountDownLatch(countDownLatch));
            }
            System.out.println("waiting.....");
    //        Thread.sleep(10000);
    //        countDownLatch.countDown();
        }
    }
     
    class TaskCountDownLatch implements Runnable {
     
        private CountDownLatch countDownLatch;
     
        TaskCountDownLatch(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
     
        public void run() {
            String name = Thread.currentThread().getName();
            try {
                System.out.println(name + ":waiting.."+countDownLatch.getCount());
                //等待一定数量任务继续执行
                Thread.sleep(new Random().nextInt(10000));
                countDownLatch.countDown();
                System.out.println(name + ":over...");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
     
    1.使用
    三个方法
    CountDownLatch(int count):构造器中的计数值(count)。
     
    void await() :会一直阻塞当前线程,直到计时器的值为0
     
    void countDown():计数减一
     
     
     
     
    2.原理
    CountDownLatch源代码是有内部类Sync实现,而Sync是继承AQS(抽象队列同步器)
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
     
        Sync(int count) {
            setState(count);
        }
     
        int getCount() {
            return getState();
        }
     
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
     
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
     
    //构造器
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
     
    //countDown方法
    public void countDown() {
        //releaseShared方法是抽象队列同步器的方法
        sync.releaseShared(1);
    }
     
    //await方法
    public void await() throws InterruptedException {
        //acquireSharedInterruptibly方法是抽象队列同步器的方法
        sync.acquireSharedInterruptibly(1);
    }
     
     
  • 相关阅读:
    代理模式之动态代理
    代理模式之静态代理
    基于Java类进行配置Spring
    Spring使用注解开发
    Spring的自动装配
    Bean的作用域
    Spring配置
    最全总结 | 聊聊 Python 办公自动化之 Excel(上)
    最全总结 | 聊聊 Python 数据处理全家桶(MongoDB 篇)
    最全总结 | 聊聊 Python 数据处理全家桶(Redis篇)
  • 原文地址:https://www.cnblogs.com/nedhome/p/10315759.html
Copyright © 2011-2022 走看看