countdownlatch是java多线程包concurrent里的一个常见工具类,通过使用它可以借助线程能力极大提升处理响应速度,且实现方式非常优雅。今天我们用一个实际案例和大家来讲解一下如何使用以及需要特别注意的点。
由于线程类的东西都比较抽象,我们换一种讲解思路,先讲解决问题的案例,然后再解释下原理。
假设在微服务架构中,A服务会调用B服务处理一些事情,且每处理一次业务,A可能要调用B多次处理逻辑相同但数据不同的事情。为了提升整个链路的处理速度,我们自然会想到是否可以把A调用B的各个请求组成一个批次,这样A服务只需要调用B服务一次,等B服务处理完一起返回即可,省了多次网络传输的时间。代码如下:
1 /** 2 * 批次请求处理服务 3 * @param batchRequests 批次请求对象列表 4 * @return 5 */ 6 public List<DealResult> deal(List<DealRequest> batchRequests){ 7 List<DealResult> resultList = new ArrayList<>(); 8 if(batchRequests != null){ 9 for(DealRequest request : batchRequests){ 10 //遍历顺序处理单个请求 11 resultList.add(process(request)); 12 } 13 } 14 return resultList; 15 }
但是B服务顺序处理批次里每一个请求的时间并没有节省,假设批次里有3个请求,一个请求平均耗时100MS,则B服务还是要花费300MS来处理完。有什么办法能立刻简单提升3倍处理速度,令总花费时间只需要100MS?到我们的大将countdownlatch出场了!代码如下:
/** * 使用countdownlatch的批次请求处理服务 * @param batchRequests 批次请求对象列表 * @return */ public List<DealResult> countDownDeal(List<DealRequest> batchRequests){ //定义线程安全的处理结果列表 List<DealResult> countDownResultList = Collections.synchronizedList(new ArrayList<DealResult>()); if(batchRequests != null){ //定义countdownlatch线程数,有多少个请求,我们就定义多少个 CountDownLatch runningThreadNum = new CountDownLatch(batchRequests.size()); for(DealRequest request : batchRequests){ //循环遍历请求,并实例化线程(构造函数传入CountDownLatch类型的runningThreadNum),立刻启动 DealWorker dealWorker = new DealWorker(request, runningThreadNum, countDownResultList); new Thread(dealWorker).start(); } try { //调用CountDownLatch的await方法则当前主线程会等待,直到CountDownLatch类型的runningThreadNum清0 //每个DealWorker处理完成会对runningThreadNum减1 //如果等待1分钟后当前主线程都等不到runningThreadNum清0,则认为超时,返回false,可以根据实际情况选择处理或忽视 runningThreadNum.await(1, TimeUnit.MINUTES); } catch (InterruptedException e) { //此处简化处理,非正常中断应该抛出异常或返回错误结果 return null; } } return countDownResultList; } /** * 线程请求处理类 * */ private class DealWorker implements Runnable { /** 正在运行的线程数 */ private CountDownLatch runningThreadNum; /**待处理请求*/ private DealRequest request; /**待返回结果列表*/ private List<DealResult> countDownResultList; /** * 构造函数 * @param request 待处理请求 * @param runningThreadNum 正在运行的线程数 * @param countDownResultList 待返回结果列表 */ private DealWorker(DealRequest request, CountDownLatch runningThreadNum, List<DealResult> countDownResultList) { this.request = request; this.runningThreadNum = runningThreadNum; this.countDownResultList = countDownResultList; } @Override public void run() { try{ this.countDownResultList.add(process(this.request)); }finally{ //当前线程处理完成,runningThreadNum线程数减1,此操作必须在finally中完成,避免处理异常后造成runningThreadNum线程数无法清0 this.runningThreadNum.countDown(); } } }
是不是很简单?下图和上面的代码又做了一个对应,假设有3个请求,则启动3个子线程DealWorker,并实例化值数等于3的CountDownLatch。每当一个子线程处理完成后,则调用countDown操作减1。主线程处于awaiting状态,直到CountDownLatch的值数减到0,则主线程继续resume执行。
在API中是这样描述的:
用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。
经典的java并发编程实战一书中做了更深入的定义:CountDownLatch属于闭锁的范畴,闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前(上面代码中的runningThreadNumq清0),这扇门一直是关闭的,并且没有任何线程能通过(上面代码中的主线程一直await),当到达结束状态时,这扇门会打开并允许所有线程通过(上面代码中的主线程可以继续执行)。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。
像FutureTask,Semaphore这类在concurrent包里的类也属于闭锁,不过它们和CountDownLatch的应用场景还是有差别的,这个我们在后面的文章里再细说。
使用CountDownLatch有哪些需要注意的点
- 批次请求之间不能有执行顺序要求,否则多个线程并发处理无法保证请求执行顺序
- 各线程都要操作的结果列表必须是线程安全的,比如上面代码范例的countDownResultList
- 各子线程的countDown操作要在finally中执行,确保一定可以执行
- 主线程的await操作需要设置超时时间,避免因子线程处理异常而长时间一直等待,如果中断需要抛出异常或返回错误结果
使用CountDownLatch提高批次处理速度的问题
- 如果一个批次请求数很多,会瞬间占用服务器大量线程。此时必须使用线程池,并限定最大可处理线程数量,否则服务器不稳定性会大福提升。
- 主线程和子线程间的数据传输变得困难,稍不注意会造成线程不安全的问题,且代码可读性有一定下降
下一篇文章我们讲讲FutureTask的应用场景,谢谢!