一、java.util.concurrent介绍
java.util.concurrent包含了许多线程安全,测试良好,高性能的并发模块。创建java.util.concurrent的目的就是要实现Collection框架对数据结构所执行的并发操作。
二、核心组件
- Executor
- ExecutorService
- ScheduledExecutorService
- Future
- CountDownLatch
- CyclicBarrier
- Semaphore
- ThreadFactory
- BlockingQueue
- DelayQueue
- Locks
- Phaser
2.1 Executor
一个接口,其定义了一个接收Runnable对象的方法executor。
1 public class Invoker implements Executor { 2 @Override 3 public void execute(Runnable r) { 4 r.run(); 5 } 6 }
1 public void execute() { 2 Executor executor = new Invoker(); 3 executor.execute( () -> { 4 // task to be performed 5 }); 6 }
2.2 ExecutorService
是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法。
在使用ExecutorService之前,我们需要定义个Runnable类
1 public class Task implements Runnable { 2 @Override 3 public void run() { 4 // task details 5 } 6 }
然后需要创建ExecutorService实例,需要制定线程池的大小
1 ExecutorService executor = Executors.newFixedThreadPool(10);
一旦executor创建完成,我们可以使用executor提交任务
1 public void execute() { 2 executor.submit(new Task()); 3 }
它还提供了两种开箱即用的执行终止方法。第一个是shutdown();它等待所有提交的任务完成执行。另一个方法是shutdownnow(),它立即终止所有挂起/正在执行的任务。
还有另一种方法等待终止(long timeout,timeunit unit),它强制阻塞,直到在触发关闭事件或发生执行超时后,或执行线程本身中断后,所有任务都已完成执行为止。
1 try { 2 executor.awaitTermination( 20l, TimeUnit.NANOSECONDS ); 3 } catch (InterruptedException e) { 4 e.printStackTrace(); 5 }
2.3 ScheduledExecutorService
ScheduledExecutorService和ExecutorService十分相似,但是它能周期性的执行任务。
1 public void execute() { 2 ScheduledExecutorService executorService 3 = Executors.newSingleThreadScheduledExecutor(); 4 5 Future<String> future = executorService.schedule(() -> { 6 // ... 7 return "Hello world"; 8 }, 1, TimeUnit.SECONDS); 9 10 ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> { 11 // ... 12 }, 1, TimeUnit.SECONDS); 13 14 executorService.shutdown(); 15 }
ScheduledExecutorService还可以在固定的延迟后执行任务
1 executorService.scheduleAtFixedRate(() -> { 2 // ... 3 }, 1, 10, TimeUnit.SECONDS); 4 5 executorService.scheduleWithFixedDelay(() -> { 6 // ... 7 }, 1, 10, TimeUnit.SECONDS);
2.4 Future
Future用于表示异步操作的结果。
1 public void invoke() { 2 ExecutorService executorService = Executors.newFixedThreadPool(10); 3 4 Future<String> future = executorService.submit(() -> { 5 // ... 6 Thread.sleep(10000l); 7 return "Hello world"; 8 }); 9 }
我们可以使用以下代码段检查未来的结果是否准备好,并在计算完成后获取数据。
1 if (future.isDone() && !future.isCancelled()) { 2 try { 3 str = future.get(); 4 } catch (InterruptedException | ExecutionException e) { 5 e.printStackTrace(); 6 } 7 }
我们还可以为给定的操作指定超时时间。如果任务花费的时间超过此时间,将引发TimeoutException。
1 try { 2 future.get(10, TimeUnit.SECONDS); 3 } catch (InterruptedException | ExecutionException | TimeoutException e) { 4 e.printStackTrace(); 5 }
2.5 CountDownLatch
CountDownWatch(在JDK5中引入)是一个实用程序类,它阻塞一组线程,直到一些操作完成。
2.6 CyclicBarrier
Cyclicbarrier的工作原理与countdownloach几乎相同,只是我们可以重用它。与CountDownWatch不同,它允许多个线程在调用最终任务之前使用await()方法(称为barrier条件)相互等待。
1 public class Task implements Runnable { 2 3 private CyclicBarrier barrier; 4 5 public Task(CyclicBarrier barrier) { 6 this.barrier = barrier; 7 } 8 9 @Override 10 public void run() { 11 try { 12 LOG.info(Thread.currentThread().getName() + 13 " is waiting"); 14 barrier.await(); 15 LOG.info(Thread.currentThread().getName() + 16 " is released"); 17 } catch (InterruptedException | BrokenBarrierException e) { 18 e.printStackTrace(); 19 } 20 } 21 22 }
现在我们可以调用一些线程来竞争屏障条件。
1 public void start() { 2 3 CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> { 4 // ... 5 LOG.info("All previous tasks are completed"); 6 }); 7 8 Thread t1 = new Thread(new Task(cyclicBarrier), "T1"); 9 Thread t2 = new Thread(new Task(cyclicBarrier), "T2"); 10 Thread t3 = new Thread(new Task(cyclicBarrier), "T3"); 11 12 if (!cyclicBarrier.isBroken()) { 13 t1.start(); 14 t2.start(); 15 t3.start(); 16 } 17 }
在这里,isbroken()方法检查执行期间是否有任何线程被中断。在执行实际流程之前,我们应该始终执行此检查。
2.7 Semaphonre
信号量用于阻止线程级对物理或逻辑资源的某些部分的访问。信号量包含一组许可证;每当线程试图进入关键部分时,它需要检查信号量是否有许可证可用。
2.8 ThreadFactory
ThreadFactory充当线程(不存在)池,根据需要创建新线程。它消除了为实现高效的线程创建机制而需要大量样板代码的需要。
1 public class BaeldungThreadFactory implements ThreadFactory { 2 private int threadId; 3 private String name; 4 5 public BaeldungThreadFactory(String name) { 6 threadId = 1; 7 this.name = name; 8 } 9 10 @Override 11 public Thread newThread(Runnable r) { 12 Thread t = new Thread(r, name + "-Thread_" + threadId); 13 LOG.info("created new thread with id : " + threadId + 14 " and name : " + t.getName()); 15 threadId++; 16 return t; 17 } 18 }
1 BaeldungThreadFactory factory = new BaeldungThreadFactory( 2 "BaeldungThreadFactory"); 3 for (int i = 0; i < 10; i++) { 4 Thread t = factory.newThread(new Task()); 5 t.start(); 6 }
2.9 BlockingQueue
在异步编程中,最常见的集成模式之一是生产者-消费者模式。
2.10 DelayQueue
DelayQueue是一个无限大的元素阻塞队列,其中只有在元素的过期时间(即用户定义的延迟)完成时才能提取元素。因此,最上面的元素(head)将具有最大的延迟量,并将在最后进行轮询。
2.11 Locks
Lock是一个实用程序,用于阻止其他线程访问某段代码,除了当前正在执行该段代码的线程之外。
2.12 Phaser
相较于Cyclicbarrier和Countdownloatch,相较于它是一个更灵活的解决方案——用于充当一个可重用的屏障,在这个屏障上,动态线程数需要等待才能继续执行。我们可以协调多个执行阶段,为每个程序阶段重用一个阶段器实例。
参考网址:
http://www.falkhausen.de/Java-8/java.util/concurrent/Future.html
http://tutorials.jenkov.com/java-util-concurrent/index.html