zoukankan      html  css  js  c++  java
  • Java并发包中线程同步器

    一、CountDownLatch

    场景:主线程需要等待所有子线程执行完毕后再进行汇总

    CountDownLatch实现比较简单,继承AQS实现了一个不可重入共享锁Sync

    1.不可重入共享锁Sync

        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
    //尝试获取锁 仅state==0时才能获取成功
    protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
    //尝试释放锁
    protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }

    2.方法

    1)void await()

        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);//尝试获取锁,不忽略中断引起的返回
        }

    2)boolean await(long timeout, TimeUnit unit)

        public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));//尝试一定时间内获取锁
        }

    3)void countDown()

        public void countDown() {
            sync.releaseShared(1);
        }

    3.实例

    public class CountDownLatchTest {
    
        //定义CountDownLatch  实际创建共享锁  且锁已被两个线程持有 state == 2
        private static CountDownLatch countDownLatch = new CountDownLatch(2);
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService pool = Executors.newFixedThreadPool(2);
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                        System.out.println("childThreadOne over");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        //线程1释放共享锁,state--
                        countDownLatch.countDown();
                    }
                }
            });
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                        System.out.println("childThreadTwo over");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        //线程2释放共享锁,state--
                        countDownLatch.countDown();
                    }
                }
            });
            System.out.println("wait all child thread over");
            //主线程阻塞, 实际尝试获取共享锁 ,仅state == 0时获取成功或被中断打断引起异常
            countDownLatch.await();
            System.out.println("all child thread over");
            pool.shutdown();
        }
    }

    二、CyclicBarrier回环屏障

    和CountDownLatch场景一样,但是CountDownLatch是一次性的,CyclicBarrier可重复使用;实现方式不同,所以使用方式不同,范围更大,见后面实例

    CyclicBarrier采用独占锁ReentranLock及条件变量trip(阻塞到达屏障的线程)实现

    设置一道屏障,①当线程数小于屏障规定的线程数时,线程入trip条件阻塞队列,线程阻塞;②当线程数等于屏障规定的线程数时,唤醒trip中所有的线程,并重置计数器状态(越过屏障)

    另外CyclicBarrier也不忽略中断引起的返回,会抛出异常,屏障会失效,抛错genetation.barrier = true

    1)变量与构造方法

        /** 独占锁 */
        private final ReentrantLock lock = new ReentrantLock();
        /** 条件变量 */
        private final Condition trip = lock.newCondition();
        /** 屏障阻塞的线程个数 */
        private final int parties;
        /* 突破屏障后执行的任务  默认为空 */
        private final Runnable barrierCommand;
        /** 默认false,当前屏障被中断打破后,设置为true,继续使用屏障会抛出异常BrokenBarrierException */
        private Generation generation = new Generation();
    
        /**
         * 实际计数器  count == 0时突破屏障
         */
        private int count;
    
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    
        public CyclicBarrier(int parties) {
            this(parties, null);
        }

    2.方法

    1)int dowait(boolean timed, long nanos)

        /**
         * 主要代码
         */
        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final Generation g = generation;
    
                if (g.broken)
                    //中断引起的跨过屏障,后续await屏障都会抛错
                    throw new BrokenBarrierException();
    
                if (Thread.interrupted()) {
                    //当前线程被中断,唤醒trip的所有阻塞线程,设置g.broken == true,抛出异常
                    breakBarrier();
                    throw new InterruptedException();
                }
                //调用一次数据器-1
                int index = --count;
                //当计数器 == 0时,达到屏蔽的线程数,越过屏障
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            //先执行屏障任务
                            command.run();
                        ranAction = true;
                        //唤醒条件变量中所有线程trip.signalAll();
                        //重置计数器count = parties;
                        //重置版本generation = new Generation();
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            //执行屏障任务抛错时,
                            //依然唤醒所有阻塞线程,
                            //但设置g.barrier == true,后续屏障都会抛错
                            breakBarrier();
                    }
                }
    
                // 当计数器 != 0 时,当前线程入条件阻塞队列
                for (;;) {
                    try {
                        if (!timed)
                            //无限阻塞
                            trip.await();
                        else if (nanos > 0L)
                            //超时阻塞
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }

    2)int await()

        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }

    3) int await(long timeout, TimeUnit unit)

        public int await(long timeout, TimeUnit unit)
            throws InterruptedException,
                   BrokenBarrierException,
                   TimeoutException {
            return dowait(true, unit.toNanos(timeout));
        }

    3.实例

    public class CyclicBarrierTest {
    
        //设置屏障线程数为2  state = 2 
        private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
    
        public static void main(String[] args){
            ExecutorService pool = Executors.newFixedThreadPool(2);
    
            pool.submit(new Runnable(){
                @Override
                public void run() {
                    try {
                        System.out.println("thread1 step1");
                        //线程1入trip阻塞队列,state--
                        cyclicBarrier.await();
                        System.out.println("thread1 step2");
                        cyclicBarrier.await();
                        System.out.println("thread1 step3");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            pool.submit(new Runnable(){
                @Override
                public void run() {
                    try {
                        System.out.println("thread2 step1");
                        //线程2入trip阻塞队列,state--
                        //与线程1的step1一起导致state == 0,越过屏障唤醒两个线程,state重新设置为2后续逻辑一致
                        cyclicBarrier.await();
                        System.out.println("thread2 step2");
                        cyclicBarrier.await();
                        System.out.println("thread2 step3");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
            pool.shutdown();
        }
    }

    三、Semaphore

    场景:与CountDownLatch一样

    信号量同步器设计类似于CountDownLatch,不同的是计数器是递增的

    Semaphore不仅实现了公平锁,还实现了非公平锁

     1.共享锁Sync

        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 1192457210091910933L;
    
            Sync(int permits) {
                setState(permits);
            }
    
            final int getPermits() {
                return getState();
            }
    
            final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    
            protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + releases;
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
                }
            }
    
            final void reducePermits(int reductions) {
                for (;;) {
                    int current = getState();
                    int next = current - reductions;
                    if (next > current) // underflow
                        throw new Error("Permit count underflow");
                    if (compareAndSetState(current, next))
                        return;
                }
            }
    
            final int drainPermits() {
                for (;;) {
                    int current = getState();
                    if (current == 0 || compareAndSetState(current, 0))
                        return current;
                }
            }
        }
    
    
        /**
         * 非公平锁
         */
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -2694183684443567898L;
    
            NonfairSync(int permits) {
                super(permits);
            }
    
            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
        }
    
        /**
         * 公平锁
         */
        static final class FairSync extends Sync {
            private static final long serialVersionUID = 2014338818796000944L;
    
            FairSync(int permits) {
                super(permits);
            }
    
            protected int tryAcquireShared(int acquires) {
                for (;;) {
                    if (hasQueuedPredecessors())
                        return -1;
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
        }

    2实例

    public class SemaphoreTest {
    
        //信号量
        private static Semaphore semaphore = new Semaphore(0);
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService pool = Executors.newFixedThreadPool(2);
    
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("thread1 over");
                        //释放+1
                        semaphore.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("thread2 over");
                        //释放+1
                        semaphore.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            //同步
            semaphore.acquire(2);
            System.out.println("all child thread over");
            pool.shutdown();
        }
    }

    四、总结

    1.线程同步的设计类似《操作系统原理》中的进程同步,信号量机制,PV操作

    2.CountDownLatch实现线程同步(计数器自减),是一次性的,仅支持公平锁,线程FIFO;

    CyclicBarrier实现线程同步(计数器自减),是可复用的(计数器还原),使用独占锁ReentranLock的条件变量trip的阻塞队列实现。

    Semaphore实现线程同步(计数器自增),也是可以复用的(计数器归0),提供公平锁与非公平锁实现。

  • 相关阅读:
    IDEA开发 Scala 项目
    mvn编译时绕过本地jar去maven仓库下载问题
    三角化(转载)
    分布式文件服务器介绍(转载)
    VSCode 设置侧边栏字体大小
    libLas编译
    OSG编译
    vcpkg.exe安装与应用
    OpenCASCADE编译
    gl2ps编译
  • 原文地址:https://www.cnblogs.com/wqff-biubiu/p/12268512.html
Copyright © 2011-2022 走看看