zoukankan      html  css  js  c++  java
  • 第二部分:并发工具类19->CountDownLatch和CyclicBarrier,多线程步调一致

    1.校对逻辑

    2.单线程里循环查询订单,派送单,执行对账

    
    while(存在未对账订单){
      // 查询未对账订单
      pos = getPOrders();
      // 查询派送单
      dos = getDOrders();
      // 执行对账操作
      diff = check(pos, dos);
      // 差异写入差异库
      save(diff);
    } 
    

    3.并行优化对账系统

    getPOrder()查询未对账订单,查询慢
    getDOrder()查询派送单,查询慢

    优化流程

    单线程执行的系统(串行系统),性能首先想到的是利用多线程并行处理
    getPOrder()和getDOrder()是否可以并行处理?可以,这2个操作没有先后顺序依赖

    代码整改

    
    while(存在未对账订单){
      // 查询未对账订单
      Thread T1 = new Thread(()->{
        pos = getPOrders();
      });
      T1.start();
      // 查询派送单
      Thread T2 = new Thread(()->{
        dos = getDOrders();
      });
      T2.start();
      // 等待T1、T2结束
      T1.join();
      T2.join();
      // 执行对账操作
      diff = check(pos, dos);
      // 差异写入差异库
      save(diff);
    } 
    

    4.用CountDownLatch 实现线程等待

    优化一下,while循环里每次都会创建新线程,创建线程是个耗时的操作。最好创建出来的线程可以循环利用

    优化,创建固定为2的线程池,while里循环利用,主线程怎么知道这2个线程执行完呢?线程池方案里,线程本身就不会退出,不能通过thread.join来判断执行完,退出

    
    // 创建2个线程的线程池
    Executor executor = 
      Executors.newFixedThreadPool(2);
    while(存在未对账订单){
      // 查询未对账订单
      executor.execute(()-> {
        pos = getPOrders();
      });
      // 查询派送单
      executor.execute(()-> {
        dos = getDOrders();
      });
      
      /* ??如何实现等待??*/
      
      // 执行对账操作
      diff = check(pos, dos);
      // 差异写入差异库
      save(diff);
    }   
    ``
    如何实现等待呢?计数器,java的并发包里提供了类似工具类CountDownLatch
    ```java
    
    // 创建2个线程的线程池
    Executor executor = 
      Executors.newFixedThreadPool(2);
    while(存在未对账订单){
      // 计数器初始化为2
      CountDownLatch latch = 
        new CountDownLatch(2);
      // 查询未对账订单
      executor.execute(()-> {
        pos = getPOrders();
        latch.countDown();
      });
      // 查询派送单
      executor.execute(()-> {
        dos = getDOrders();
        latch.countDown();
      });
      
      // 等待两个查询操作结束
      latch.await();
      
      // 执行对账操作
      diff = check(pos, dos);
      // 差异写入差异库
      save(diff);
    }
    

    5.优化性能

    发现check(),save()之间还是串行的
    是不是执行对象操作的时候,可以执行下一轮的查询操作了

    可以思考一下,两次查询操作能够和对账操作并行,对账操作依赖查询操作的结果
    生产者-消费者的套路
    抽象2个队列,订单队列,派送单队列

    T1线程执行订单查询工作,T2线程执行派送单查询工作,
    T1,T2都生产完1条数据后,通知T3线程执行对账操作

    T1,T2要互相等待,步调一致。当线程T1,T2都生产完一条数据,还要通知线程T3执行对账操作

    6.CyclicBarrier实现线程同步

    T1,T2步调一致,能够通知线程T3
    计数器初始2,线程T1,T2生产完一条数据都将计数器减1,如果计数器大于0,则T1/T2等待,如果计数器等于0,通知线程T3,并唤醒等待的线程T1,T2,同时,将计数器重置为2

    线程T1,T2生产下一条数据的时候就可以用这个计数器了

    java并发包里提供了类似功能CyclicBarrier,当计数器减为0的时候,会调用这个回调函数

    T1查询订单,查询出1条调用barrier.await()计数器减1,同时等待计数器变为0
    T2查询派送单,查询出1条时,调用barrier.await()计数器减1,同时等待计数器变为0
    当T1和T2都调用barrier.await()时,计数器会减到0,T1和T2就可以执行下一条语句了,同时会调用barrier回调函数来执行对账操作
    CyclicBarrier计数器有自动重置功能,减到0,自动重置你设置的初始值

    
    // 订单队列
    Vector<P> pos;
    // 派送单队列
    Vector<D> dos;
    // 执行回调的线程池 
    Executor executor = 
      Executors.newFixedThreadPool(1);
    final CyclicBarrier barrier =
      new CyclicBarrier(2, ()->{
        executor.execute(()->check());
      });
      
    void check(){
      P p = pos.remove(0);
      D d = dos.remove(0);
      // 执行对账操作
      diff = check(p, d);
      // 差异写入差异库
      save(diff);
    }
      
    void checkAll(){
      // 循环查询订单库
      Thread T1 = new Thread(()->{
        while(存在未对账订单){
          // 查询订单库
          pos.add(getPOrders());
          // 等待
          barrier.await();
        }
      });
      T1.start();  
      // 循环查询运单库
      Thread T2 = new Thread(()->{
        while(存在未对账订单){
          // 查询运单库
          dos.add(getDOrders());
          // 等待
          barrier.await();
        }
      });
      T2.start();
    }
    
    原创:做时间的朋友
  • 相关阅读:
    PHP:第四章——PHP数组处理函数
    PHP:第四章——PHP数组array_intersect计算数组交集
    PHP:第四章——PHP数组array_diff计算数组差集
    PHP:第四章——PHP数组查找,替换,过滤,判断相关函数
    GPG入门
    GPG入门教程
    运行gpg --gen-key生成key时出现卡住的问题
    程序员练级攻略(2018) 与我的专栏
    构建一个在线ASCII视频流服务
    Ubuntu 16.04配置国内高速apt-get更新源
  • 原文地址:https://www.cnblogs.com/PythonOrg/p/14977829.html
Copyright © 2011-2022 走看看