zoukankan      html  css  js  c++  java
  • 同步工具类

    同步工具类

    阿里巴巴2021版JDK源码笔记(2月第三版).pdf

    链接:https://pan.baidu.com/s/1XhVcfbGTpU83snOZVu8AXg
    提取码:l3gy

    除了锁与 Condition,Concurrent 包还提供了一系列同步工具 类。这些同步工具类的原理,有些也是基于AQS的,有些则需要特殊的实现机制.

    1. Semaphore

    Semaphore也就是信号量,提供了资源数量的并发访问控制,其使用代码很简单

     //初始化10个资源,第2个参数是公平和非公平选项
    Semaphore available = new Semaphore(10,true);
    available.acquire();//每次获取一个,如果获取不到,线程就会阻塞
    available.release();//用完释放
    
    • 假设有n个线程来获取Semaphore里面的资源(n>10),n个线程中只有10个线程能获取到,其他线程都会阻塞。直到有线程释放了资源,其他线程才能获取到

    • 当初始的资源个数为1的时候,Semaphore退化为排他锁。正因为 如此,Semaphone的实现原理和锁十分类似,是基于AQS,有公平和非公平之分

    由于Semaphore和锁的实现原理基本相同,上面的代码不再展开解 释。资源总数即state的初始值,在acquire里对state变量进行CAS减 操作,减到0之后,线程阻塞;在release里对state变量进行CAS加操作。

    2. CountDownLatch

    2.1 CountDownLatch使用场景

    一个主线程要等待10个 Worker 线程工作完毕才退出,就能使用CountDownLatch来实现。

    //初始化为10,没有公平和非公平的概念
    CountDownLatch countDownLatch = new CountDownLatch(10);
    countDownLatch.await();//主线程调用该方法会阻塞在这里
    countDownLatch.countDown();//减 1,挡减为0时,主线程会被唤醒
    

    2.2 await()实现分析

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
     protected int tryAcquireShared(int acquires) {
         return (getState() == 0) ? 1 : -1;
     }
    

    从tryAcquireShared(..)方法的实现来看,只要state!=0,调用await()方法的线程便会被放入AQS的阻塞队列,进入阻塞状态。

    2.3 countDown()实现分析

    public void countDown() {
        sync.releaseShared(1);
    }
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
    

    countDown()调用的 AQS 的模板方法 releaseShared(),里 面的 tryReleaseShared(..)被CountDownLatch.Sync重新实现。从 上面的代码可以看出,只有state=0,tryReleaseShared(..)才会返 回true,然后执行doReleaseShared(..),一次性唤醒队列中所有阻塞的线程

    3. CyclicBarrier

    循环屏障

    3.1 CyclicBarrier使用场景

    CyclicBarrier cb = new CyclicBarrier(10);
    cb.await()
    

    10个工程师一起来公司应聘,招聘方式分为 笔试和面试。需要10个人都到了才进行笔试,面试

    3.2 CyclicBarrier实现原理

    public class CyclicBarrier {
    	private final ReentrantLock lock = new ReentrantLock();
        //用于线程互相唤醒
        private final Condition trip = lock.newCondition();
        //总线程数
        private final int parties;
        private final Runnable barrierCommand;
        private Generation generation = new Generation();
        
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    }
    

    当所有的线程被唤醒时,barrierAction被执行。

    3.3 await方法分析

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
    TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
    
            if (Thread.interrupted()) {//响应中断
                breakBarrier();//唤醒所有阻塞的线程
                throw new InterruptedException();
            }
    
            int index = --count; //每个线程调用一次
            if (index == 0) {  // tripped,当调为0时,此线程唤醒所有线程
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();//执行回调方法
                    ranAction = true;
                    nextGeneration();//唤醒所有线程
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
            // loop until tripped, broken, interrupted, or timed out
            for (;;) { //人没齐,都阻塞在这里
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }
                if (g.broken)
                    throw new BrokenBarrierException();
    
                if (g != generation)
                    return index;
    
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
    
    • CyclicBarrier是可以被重用的。以上一节的应聘场景为 例,来了10个线程,这10个线程互相等待,到齐后一起被唤醒,各自 执行接下来的逻辑;然后,这10个线程继续互相等待,到齐后再一起被唤醒。每一轮被称为一个Generation,就是一次同步点。
    • CyclicBarrier 会响应中断。10 个线程没有到齐,如果有 线程收到了中断信号,所有阻塞的线程也会被唤醒,就是上面的breakBarrier()函数。然后count被重置为初始值(parties),重新开始。
    • 上面的回调函数,barrierAction只会被第10个线程执行1次(在唤醒其他9个线程之前),而不是10个线程每个都执行1次。

    4. Exchanger

    4.1 Exchanger使用场景

    Exchanger用于线程之间交换数据,其使用代码很简单

    4.2 实现原理

    Exchanger的核心机制和Lock一样,也是CAS+park/unpark

    5. Phaser

    5.1 用Phaser替代CyclicBarrier和CountDownLatch

    从JDK7开始,新增了一个同步工具类Phaser,其功能比CyclicBarrier和CountDownLatch更加强大。

    //初始化10
    Phaser phaser = new Phaser(10);
    phaser.awaitAdvance(phaser.getPhase());//主线程调用该方法是,阻塞在这
    phaser.arrive();//每个线程工作完成之后。调用一次arrive
    phaser.arriveAndAwaitAdvance()
    

    5.2 Phaser新特性

    • 动态调整线程个数

       phaser.register();//注册1个
      phaser.bulkRegister(10);//注册10个
      phaser.arriveAndDeregister();//借注册
      
    • 层次Phaser:多个Phaser可以组成树状结构,可以通过在构造函数中传入父Phaser来实现

  • 相关阅读:
    代码中的TODO FIXME XXX 等注释释义
    [转]Linux环境下段错误的产生原因及调试方法小结
    linux 的date命令及系统时间设置
    声明和定义
    [转]关于fork的一个面试题
    [转]头文件定义全局变量等问题
    jQuery帮助之CSS尺寸(五)outerHeight、outerWidth
    ActiveX
    10个免费的javascript富文本编辑器(jQuery and nonjQuery)
    <推荐>65个以自然风光为背景的UI设计
  • 原文地址:https://www.cnblogs.com/steven158/p/15023614.html
Copyright © 2011-2022 走看看