zoukankan      html  css  js  c++  java
  • 多线程七 AQS

    一 . 简介AQS

    AQS简介

    • 在同步组件的实现中,AQS是核心部分,同步组件的实现者,通过使用AQS提供的模板方法 实现同步组件语义
    • AQS实现了对同步状态的管理以及阻塞线程进行排队,等待通知等等一系列底层的实现处理
    • AQS核心:使用Node实现同步队列,底层是个双向链表,可以用于同步锁或者其他同步装的基础框架

    AbstractQueuedSynchronized,虽然类名开头是Abstract,但是他不是抽象类,意义就是说,单独使用它是没有意义的,依赖他去实现同步组件才有意义--相当于没模板方法模式

    • 子类通过继承并实现他的方法,管理其状态 acquire和release
    • 可以同时实现排它锁和共享锁,站在使用者的角度看,它可以帮我们完成两件事,独占控制和共享控制,它的所有子类中要么实现重写了它独占功能的API,要么使用的是共享功能的API,而不会同时使用两套API,即使是他最有名的实现类ReentrantLock,也是通过两个内部类,分别使用者两套API

    AQS实现的大致思路,它内部有一个双向的链表,链表的每一个节点都是一个Node的结构,线程会来尝试的获取锁,如果失败了那么它就将当前线程包装成一个Node节点,加入到同步队列中,前一个节点释放锁后,唤醒自己的后继节点,它实现的依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器
    此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础

    以下类都是依赖AQS是实现的:

    • ReentrantLock
    • ReentrantReadWritrLock.ReadLock
    • ReentrantReadWriteLock.WriteLock

    二 . 使用AQS实现自己的锁

    自己的锁肯定要去实现lock接口,重写里面的方法,怎么重写呢?使用AQS,将AQS作为内部的帮助器类,重写里面的tryAcquir和tryRelease方法,因为lock()我们采用帮助器acquire的方法实现,而此方法会至少调用一次tryAcquire,同理,释放锁,我们重写帮助器的tryRelease,,,,我们只是简单的使用一下,完成加锁,释放锁,锁重入即可
    他面临着两个问题

    1. 怎么知道来拿锁的线程是上一个拿到锁的线程

      • 判断if(当前线程==持有锁的线程){state++;}要求计数器自增
    2. 怎么释放掉锁

      • 重复n次拿到了锁,要求计数器依次减下去
    
    public class AQSDemo01 implements Lock {
    private Helper helper = new Helper();
    
    private class Helper extends AbstractQueuedSynchronizer {
    
        @Override
        protected boolean tryAcquire(int arg) {
            int state = getState();
            Thread t = Thread.currentThread();
    
            if (state == 0) {
                //如果当前状态的值等于预期的值,就把 当前状态的中修改成 arg的值...... ( 刚才掉坑了, compareAndSerState方法,会帮助我们去对比 手动输进去的0 和 当前的状态)
                if (compareAndSetState(0, arg)) {
                    System.out.println("线程来了"+Thread.currentThread().getName()+"   arg=="+arg);
                  setExclusiveOwnerThread(t);
                    return true;
               }
            } else if (getExclusiveOwnerThread() == t) {
                setState(state + 1);
                return true;
            }
            return false;
        }
    
        @Override
        protected boolean tryRelease(int arg) {
    
            if (Thread.currentThread() != getExclusiveOwnerThread()) {
                throw new RuntimeException();
            }
            int state = getState() - arg;
    
            boolean flag = false;
    
            if (state == 0) {
                setExclusiveOwnerThread(null);
                flag = true;
    
            }
            setState(state);
            return flag;
        }
    
    
        Condition newCondition() {
            return new ConditionObject();
        }
    }
    
    @Override
    public void lock() {
        helper.acquire(1);
    }
    
    @Override
    public void lockInterruptibly() throws InterruptedException {
        helper.acquireInterruptibly(1);
    }
    
    @Override
    public boolean tryLock() {
        return helper.tryAcquire(1);
    }
    
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return helper.tryAcquireNanos(1, unit.toNanos(time));
    }
    
    @Override
    public void unlock() {
        helper.release(1);
    }
    
    @Override
    public Condition newCondition() {
        return helper.newCondition();
    }
    }
    

    测试类如下,锁正常

    public class textAQS {
    private int value=0;
    private AQSDemo01 lock = new AQSDemo01();
    public int next() {
       lock.lock();
    
        try {
            Thread.sleep(300);
            return value++;
        } catch (InterruptedException e) {
            e.printStackTrace();
           throw new RuntimeException();
        } finally {
            lock.unlock();
        }
    }
    
    /*
     * 经典的验证 锁的重复问题 ,在单一的线程下, a()  想在  未释放锁  的前提下  调用b(),前提就是可冲入锁
     * */
    public void a() {
        lock.lock();
        System.out.println("a");
        b();
        lock.unlock();
    }
    
    public void b() {
        lock.lock();
        System.out.println("b");
        lock.unlock();
    }
    
    public static void main(String[] args) {
    
        textAQS m = new textAQS();
        //测试可重入
       new Thread(new Runnable() {
    
            @Override
            public void run() {
              m.a();
            }
        }).start();
    
        System.out.println("主线程=="+Thread.currentThread().getName());
    
        ExecutorService executorService = Executors.newCachedThreadPool();
    
    
        //从线程池拿出四条线程执行next任务,查看结果是否同步,同步
         for (int i=0;i<4;i++){
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    while(true)
                        System.out.println(Thread.currentThread().getName()+"  "+m.next());
                }
            });
    
        }
    
    }
    
    }
    

    AQS的同步组件

    1. CountDownLatch

    • 用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,
      所以在当前计数到达零之前,await 方法会一直受阻塞。当调用了一定次数的CountDown()是计数器的值为零后,会释放所有等待的线程
      ,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置
    • 它的典型使用场景就是分布计算
    • 打个比方

    课代表等所有同学交完作业再交给老师,
    1: 课代表等待所有的同学(线程)交作业 CountDownLatch cdl = new CountDownLatch(int 学生数)
    2: 单个学生交完作用, cdl.countDown() --> 学生数减一
    3: 主线程: cdl.await() 只要学生数不为零, 就等待

    public class countDownLatch02 {
    private static int []nums;
    
    public countDownLatch02(int line){
        nums=new int[line];
    }
    
    //分隔字符串数组,完成 当前行  (  一行 ) 相加
    public  void colculate(String s ,int index,CountDownLatch count){
        System.out.println("单行线程开始执行..  "+Thread.currentThread().getName());
        String[] s1 = s.split(",");
        int total=0;
        for (String s2:s1) {
            int i = Integer.parseInt(s2);
            total+=i;
        }
        nums[index]=total;
    
        System.out.println(Thread.currentThread().getName() +"线程   计算结果是=="+total);
        count.countDown();
    }
    
    
    //分别计算没行的总值
    public   void sum(){
        System.out.println("加总线程开始执行...");
        int total =0;
        for(int i=0;i<nums.length;i++){
            total+=nums[i];
        }
        System.out.println("执行的结果是=="+total);
    }
    
    
    public static void main(String[] args) throws IOException, InterruptedException {
    
        // 根据行数,
        List<String> contents = readFile();
        int size= contents.size();
        CountDownLatch c = new CountDownLatch(size);
    
        countDownLatch02 latch = new countDownLatch02(size);
    
        System.out.println("zhuxianc");
        // //创建出相应数目的线程,
        for(int i=0;i<size;i++){
            final int j =i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    latch.colculate(contents.get(j),j,c);
                }
            }).start();
        }
        //在主线程中加总
        // System.out.println("当前活跃的实现数"+Thread.activeCount());
       /* while((Thread.activeCount())>2){  //自旋,等待其他线程执行完..
            System.out.println("当前活跃的实现数"+Thread.activeCount());
        }*/
        c.await();
        latch.sum();
    }
    
    /*
     * 读取文件,将每一行存放进list数组...
     * */
    public static List<String> readFile() throws IOException {
        List<String> list = new ArrayList<>();
        String line=null;
        BufferedReader bufferedReader = new BufferedReader(new FileReader("D:\SETextMaven\textcountDownLatch.txt"));
        while ((line = bufferedReader.readLine())!=null){
            list.add(line);
        }
        return list;
    }
    
    }
    

    2. Semaphore

    • 常常作用于仅能提供有限访问的资源,比如项目中使用到的数据库的连接数,可能最大只有20,但是外界的并发量却很庞大,所以我们可以使用Semaphore进行控制,当它把并发数控制到1时,和单线程很相似
    • 他可以很容易的控制同一时刻,并发访问某一个资源被的线程数,使用起来也很简单,对需要进行并发控制的代码用 semaphore.acquire()和 semaphore.release(); 包裹起来即可
    /*
    * 字面意思:  信号量        --> Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目
    * 作用: 用来控制同时访问某些特定资源的线程数量,协调各个线程合理使用公共资源
    * 简介: Semaphore 可以用来维护当前访问自身的线程个数,并提供了同步机制,比如实现一个文件的允许的并发数
    * 应用场景:
    *       开启 30条线程 把 一万个文件的内容读取到内存
    *       使用Semaphore允许10个线程可以并发执行,将内存中的数据写回数据库
    * 
    *       模拟高并发
    * */
    public class semaphore {
    public static void main(String[] args) {
        final int tNum =30;
      // ExecutorService executorService;
      //  executorService = new Executors.newFixedThreadPool();
        Semaphore semaphore = new Semaphore(5);//允许一次性允许 并发执行的线程数
    
        for (int i=0;i<100;i++){
            new Thread(()->{
                try {
                    semaphore.acquire();  //当前线程获取 Semaphore 的 许可证
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"开始任务");
    
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                semaphore.release();       // 归还许可证
    
            }).start();
        }
    }
    }
    

    3 . CyclicBarrier

    • 和CountDownLanch不同的是,它描述的是所有的线程相互等待的过程
      构造方法,参数为parties - 在启动 barrier 前必须调用 await() 的线程数
    • 注意点,如果 传入的参数为6,而所有线程一共才五条,那么主线程和 子线程,将永远处于等待状态,因为没有第六条线程执行 await方法... 或者, 线程在执行await()之前,出现异常, 屏障永远不会被满足

    实例代码:

     
    /*
    * 它允许一组线程相互等待,直到达到某个公共的屏障点,
    * 也就是说,所有的线程必须相互等待,-->  直到所有线程都 满足屏障的要求--> 执行后面的任务
    *
    *   开会:
    *                                       await()          屏障     后续的任务
    *       公司里的所有人都去开会,--> 先到的人等待迟到的人--> 人到齐了,开会....
    * */
    
    /*
    *   应用场景,多线程计算数据,最后合并计算结果
    * */
    
    /*
    * 模拟开会...
    * */
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    public class CyclicBarrier01 {
    
    public void meeting(CyclicBarrier cyclicBarrier){
    System.out.println(Thread.currentThread().getName()+"到达会议室...");
    
    try {
        cyclicBarrier.await();  //此线程等待..
        System.out.println(Thread.currentThread().getName()+"准备开会..");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (BrokenBarrierException e) {
        e.printStackTrace();
    }
    }
    
    public static void main(String[] args) {
    
        CyclicBarrier01 c = new CyclicBarrier01();
        //  构造函数1:
        // 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,
        // 但它不会在启动 barrier 时执行预定义的操作。
        //参数:
        //parties - 在启动 barrier 前必须调用 await() 的线程数
        //抛出:
        //IllegalArgumentException - 如果 parties 小于 1
        //注意点:
        //      如果 传入的参数为6,而所有线程一共才五条,那么主线程和 子线程,将永远处于等待状态,因为没有第六条线程执行 await方法...
        //             或者, 线程在执行await()之前,出现异常,  屏障永远不会被满足
        CyclicBarrier cyclicBarrier = new CyclicBarrier(6);
          for (int i =0;i<5;i++) {  //开启五条线程...
              new Thread(new Runnable() {
                  @Override
                  public void run() {
                      c.meeting(cyclicBarrier);
                  }
              }).start();
          }
    
    
        // 主线程
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("人都到齐了,开会...");
    
     //   cyclicBarrier.reset();
    }
    }
    
    

    带 Runable, 当所有预期的线程都await后,先执行Runable里面的任务

    public class CyclicBarrier02 {
    
    public void meeting(CyclicBarrier cyclicBarrier){
    System.out.println(Thread.currentThread().getName()+"到达会议室...");
    
    try {
        cyclicBarrier.await();  //此线程等待..
        System.out.println(Thread.currentThread().getName()+"听领导讲话...");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (BrokenBarrierException e) {
        e.printStackTrace();
    }
    }
    
    public static void main(String[] args) {
    
        CyclicBarrier02 c = new CyclicBarrier02();
        //  构造函数2:
    /*
        创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,
        并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。
    
        参数:
        parties - 在启动 barrier 前必须调用 await() 的线程数
        barrierAction - 在启动 barrier 时执行的命令;如果不执行任何操作,则该参数为 null
        抛出:
    */
    
        CyclicBarrier cyclicBarrier = new CyclicBarrier(6, new Runnable() {
            @Override
            public void run() {
    
                System.out.println("开始开会...");
            }
        });
          for (int i =0;i<5;i++) {  //开启五条线程...
              new Thread(new Runnable() {
                  @Override
                  public void run() {
                      c.meeting(cyclicBarrier);
                  }
              }).start();
          }
        // 主线程
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("主线程也 await了 ...   人都到齐了,开会...");
        cyclicBarrier.reset();
    
    }
    }
    
    

    4. J.U.C同步组件 FutrueTask

    在 多线程二 基本技能中有详细的将讲解,使用

    5 J.U.C 同步组件Fork/Join框架

    • ForkJoin是java7提供的并行执行任务的框架
    • 它的设计思路是把一个大人物Fork成若干个小任务分布计算,然后Join这些子任务的结果,最终得到这个大任务的结果,使用工作窃取算法也就是某个线程从其他的线程的工作队列(双端队列,来窃取的线程从这个队列的尾部取任务,减少竞争)里面窃取任务执行

    局限性:

    • 假如说双端任务队列里面就一个任务,那么肯定就出现竞争,而且还有多开辟线程的开销
    • 只能使用Fork和Join去同步,如果使用了别的同步机制.那么同步线程就不能去窃取执行其他任务
    • 工作队列里面的任务不应该是IO操作
    • 任务不能抛出检查异常,它必须通过必要的代码去处理他们
  • 相关阅读:
    互联网流媒体直播点播平台报ioutil.WriteFile错误导致文件只读如何处理?
    互联网直播点播平台go语言搭建重定向和反向代理的区别及使用
    互联网视频直播点播平台EasyDSS如何集成流媒体平台调取登录及上传接口?
    互联网直播点播平台如何联合RTMP推流摄像头构建智慧消防方案?
    RTMP推流网关如何实现摄像头微信幼儿园直播?
    国标GB28181流媒体服务器gorm框架内表明重复添加前缀排查
    国标GB28181流媒体平台EasyGBS新版界面视频流更新时间显示错误问题解决
    视频流媒体播放器EasyPlayer-RTSP-Android 如何随意切换播放视频流?
    国标GB28181流媒体服务器分辨率会导致视频无法播放吗?
    国标GB28181流媒体平台集成后播放多个视频部分视频无法播放问题
  • 原文地址:https://www.cnblogs.com/ZhuChangwu/p/11150332.html
Copyright © 2011-2022 走看看