zoukankan      html  css  js  c++  java
  • 分而治之的 CyclicBarrier

    案例

        public static void main(String[] args) throws Exception{
            Map<String,Object> map = new A().test();
            for(Map.Entry<String, Object> e : map.entrySet()){
                System.out.println(e.getKey()+" : "+e.getValue());
            }
    
        }
    
        public Map<String,Object> test()throws Exception{
            final boolean[] flag = {false};
            Map<String, Object> map = new HashMap<>();
    
            CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
                @Override
                public void run() {
                    System.out.println("所有线程都执行完了,合并返回结果");
                    flag[0] = true;
                }
            });
    
            // 异步调用三个类处理业务逻辑
            new Thread(()->{
                try {
                    Thread.sleep(1000);
                    System.out.println("线程一执行 自己的一部分工作...");
                    map.put("code",1);
                    barrier.await();
                    System.out.println("线程一执行完");
                }catch (Exception e){
                    e.printStackTrace();
                }
            }).start();
    
            new Thread(()->{
                try {
                    Thread.sleep(1000);
                    System.out.println("线程二执行 自己的一部分工作...");
                    map.put("data",null);
                    barrier.await();
                    System.out.println("线程二执行完");
                }catch (Exception e){
                    e.printStackTrace();
                }
            }).start();
    
            new Thread(()->{
                try {
                    Thread.sleep(1000);
                    System.out.println("线程三执行 自己的一部分工作...");
                    map.put("msg", "删除成功");
                    barrier.await();
                    System.out.println("线程三执行完");
                }catch (Exception e){
                    e.printStackTrace();
                }
            }).start();
    
            // 等三个任务执行完
            while(!flag[0]){
                //Thread.sleep(100);
            }
            return map;
        }

    源码阅读

    创建
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            // 就是将设置的 parties 和 runnable 存起来
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    
    
    await()
        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            //1. 首先上来就加上了 ReentrantLock 锁
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final Generation g = generation;
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
                // 2. 我们初始化的时候就设置了count,现在递减
                int index = --count;
                if (index == 0) {  // tripped
                    // 3. 这是第一种情况,index==0 说明是最后一个线程在执行任务了
                    boolean ranAction = false;
                    try {
                        // 4. 获取到我们初始化传入的命令,也就是说我们的那个 run() 不是线程来着,就是一个普通方法。
                        final Runnable command = barrierCommand;
                        if (command != null)
                            // 5. 执行 run() 方法
                            command.run();
                        ranAction = true;
                        // 6. 将 count重置成 partities数量,并通过Condition.signalAll(),唤醒在队列里排队的线程,也就是调用 await() 的线程,他们会尝试获取lock锁,因为那些线程此时被唤醒过后,都会进入Lock锁的AQS锁等待队列里去。
                        nextGeneration();
                        return 0;
                    } finally {
                        // 7. 唤醒后面节点
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                for (;;) {
                    try {
                        if (!timed)
                        // 3. 这里就是第二中情况了
                        // Condition.await(),底层,其实是释放了当前的lock锁,触发了把当前线程加入condition等待队列里,挂起当前线程
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }

  • 相关阅读:
    WSGIRequest对象 和querydict对象
    限制请求method及页面重定向
    ORM模型里连接数据库常用方法和QuerySet API
    orm模型(关于时区时间)
    spring mvc 前后端数据交互方式(整理)
    java 国际化(转载)
    spring 基础学习笔记
    (转载)java nio 原理解析
    collection 所有集合的接口。
    java.lang.String类
  • 原文地址:https://www.cnblogs.com/wlwl/p/15054128.html
Copyright © 2011-2022 走看看