zoukankan      html  css  js  c++  java
  • CountDownLatch 和 CyclicBarrier

    一、CountDownLatch

    1、概念:

    一个计数器,线程完成一个记录一个,计数器递减,只能用一次。

    使一个线程等待其他线程各自执行完毕后再执行。

    通过计数器实现,计数器初始值是线程的数量,当每个线程执行完毕后,计数器值 -1,当计数器 = 0时,表示所有线程执行完毕,在闭锁上等待的线程就可以恢复工作。

    2、源码:

    一个构造器:

    // 初始化计数器的值
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    三个方法:

    // 1、调用 await()方法的线程会被挂起,直到count = 0 才会继续执行
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    // 2、线程挂起,count = 0 会继续执行,等待时间到达后,如果count != 0 也会执行
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    
    // 3、将 count 值 -1 
    public void countDown() {
        sync.releaseShared(1);
    }

    事实用例:

    大批量for循环取数据插入数据库。

    private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(30);
    
    
    public static void main(String[] args) throws Exception {
        ......
        // 初始化
        CountDownLatch countDownLatch=new CountDownLatch(params.size());
        Object[][] data = new Object[params.size()][3];
        for (int i = 0; i < params.size(); i++) {
            Object[] param = params.get(i);
            data[i] = param;
            lockSot(Long.valueOf(param[1].toString()), Integer.valueOf(param[2].toString()), countDownLatch);
        }
        try {
            // 这里进入线程等待,会等待 for循环里所有异步线程执行完毕后,才会往下执行。
            // 如果没有这个等待,就会出现先执行后面代码,for循环的线程后执行完毕的情况。
            countDownLatch.await();
            QueryRunner initRunner = new QueryRunner();
            initRunner.insertBatch(initConn, INSERT_SQL, new ResultSetHandler(){
                @Override
                public Object handle(ResultSet rs) throws SQLException {
                    return null;
                }
            }, data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public static void lockSot(Long sid, int slot, CountDownLatch countDownLatch) {
        EXECUTOR_SERVICE.execute(() -> {
            //  这里进行异步线程调用
            Map<String, Object> data = Maps.newHashMap();
            data.put("sid", sid);
            data.put("slot", slot);
            Map<String, String> headerMap = Maps.newHashMap();
            headerMap.put("tk", TK);
            if(LOCK_BOOL) {
                OkHttpUtil.post(lock_url, data, headerMap);
            } else {
                OkHttpUtil.post(unlock_url, data, headerMap);
            }
           
            if(countDownLatch != null) {
                // 这里对计数进行 -1
                countDownLatch.countDown();
            }
        });
    }

     

    二、CyclicBarrier

    1、概念

    计数器是一个阀门,需要所有线程到达,然后继续执行,计数器递增,提供 reset 功能,可以多次复用。

     

    两个构造器:

    // 初始化线程数
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    
    // barrierAction 最后一个到达线程要做的任务。
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    重要方法:

    // 线程调用 await() 表示已经达到阈值
    // BrokenBarrierException 表示阈值被破坏,原因可能是其中一个线程 await() 时被中断或超时
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    
    // 2、线程挂起,count = 阈值 会继续执行,等待时间到达后,如果count != 阈值 也会执行
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
    
    // 3、重置栅栏 
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

    事实用例:

    暂无

     

  • 相关阅读:
    57.大数据线性处理csdn数据(fread,fwrite) 百万数据秒读数据
    56.fread fwrite
    ZOJ 2724 Windows Message Queue (二叉堆,优先队列)
    priority_queue用法(转载)
    Runtime Error(ACCESS_VIOLATION)
    POJ 2309 BST(二叉搜索树)
    POJ 2255 Tree Recovery
    [转载]C++Assert()函数
    POJ 2499 Binary Tree
    POJ 3437 Tree Grafting
  • 原文地址:https://www.cnblogs.com/wgy1/p/15511827.html
Copyright © 2011-2022 走看看