Phaser允许并发多阶段任务。Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步。
一个Phaser对象有两种状态:
- 活跃态(Active):当存在参与同步的线程的时候,Phaser就是活跃的,并且在每个阶段结束的时候进行同步。
- 终止态(Termination):当所有参与同步的线程都取消注册的时候,Phaser就处于终止态,在终止状态下,Phaser没有任何参与者。当Phaser对象onAdvance()方法返回True时,Phaser对象就处于终止态。当Phaser处于终止态时,同步方法arriveAndAwaitAdvance()会立即返回,而且不会做任何同步操作。
Phaser对象的主要方法:
- arrive():这个方法通知phaser对象一个参与者已经完成当前阶段,但是它不应该等待其他参与者都完成当前阶段任务。必须使用这个方法,因为它不会与其他线程同步。
- awaitAdvance(int phase).如果传入的参数与当前阶段一直,这个方法将会将当前线程置于休眠,直到这个阶段的参与者都完成运行。如果传入的阶段参数与当前阶段不一致,立即返回。
- arriveAndAwaitAdvance().当一个线程调用此方法时,Phaser对象将减1,并把这个线程至于休眠状态,直到所有其他线程完成这个阶段。
- arriveAndDeregister().当一个线程调用此方法时,Phaser对象将减1,并且通知这个线程已经完成了当前语句,不会参加到下一个阶段中,因此phaser对象在开始下一个阶段时不会等待这个线程。
- awaitAdvanceInterruptibly(int phase).这个方法跟awaitAdvance(int phase)一样,不同之处是,如果这个方法中休眠的线程被中断,它将抛出InterruptedException异常。
- register():这个方法将一个新的参与者注册到phaser中,这个新的参与者将被当成没有执行完本阶段的线程。
- bulkRegister(int Parties):这个方法将指定数目的参与者注册到Phaser中,所有的这些参与者都讲被当成没有执行完本阶段的线程。
下面将通过具体的实例来讲解Phaser的用法。在实例中Phaser将同步三个并发任务。这三个任务将在三个不同的文件夹及其子文件夹中查找过去24小时内改过扩展名为。log的文件。这个任务分解为三个步骤:
1.在指定文件夹及其子文件夹中获得扩展名为.log的文件。
2.对第一步的结果过滤,删除修改时间超过24小时的文件。
3.将结果打印数据到控制台。
public class FileSearch implements Runnable { private String initPath;// 查找路径 private String end;// 文件后缀 private List<String> results;// 结果集 private Phaser phaser; public FileSearch(String initPath, String end, Phaser phaser) { this.initPath = initPath; this.end = end; this.phaser = phaser; this.results = new ArrayList<String>(); } private void direactoryProcess(File file) { File list[] = file.listFiles(); if (list != null) { for (File file2 : list) { if (file2.isDirectory()) { direactoryProcess(file2); } else { fileProcess(file2); } } } } private void fileProcess(File file) { if (file.getName().endsWith(end)) { results.add(file.getAbsolutePath()); } } private void filterResult() { List<String> newResult = new ArrayList<String>(); long actualDate = new Date().getTime(); for (int i = 0; i < results.size(); i++) { File file = new File(results.get(i)); long lastModifyTime = file.lastModified(); if (actualDate - lastModifyTime < TimeUnit.MICROSECONDS.convert(1, TimeUnit.DAYS)) { newResult.add(results.get(i)); } } results = newResult; } private boolean checkResults() { if (results.isEmpty()) { System.out.println(Thread.currentThread().getName() + ": Phase " + phaser.getPhase() + " 0 result"); System.out.println(Thread.currentThread().getName() + ": Phase " + phaser.getPhase() + " end"); phaser.arriveAndDeregister(); return false; } else { System.out.println(Thread.currentThread().getName() + ": Phase " + phaser.getPhase() + " " + results.size() + " result"); phaser.arriveAndAwaitAdvance(); return true; } } private void showInfo() { for (int i = 0; i < results.size(); i++) { System.out.println(Thread.currentThread().getName() + ":" + results.get(i)); } phaser.arriveAndAwaitAdvance(); } @Override public void run() { phaser.arriveAndAwaitAdvance(); System.out.println(Thread.currentThread().getName()+": Starting"); File file=new File(initPath); if(file.isDirectory()){ direactoryProcess(file); } if(!checkResults()){ return; } filterResult(); if(!checkResults()){ return; } showInfo(); phaser.arriveAndDeregister(); System.out.println(Thread.currentThread().getName()+": Work completed"); } }
public class PhaserMain { public static void main(String[] args) { Phaser phaser=new Phaser(3); FileSearch system=new FileSearch("c:\Windows", "log", phaser); FileSearch apps=new FileSearch("C:\Programs Files", "log", phaser); FileSearch documents=new FileSearch("C:\Documents And Settings", "log", phaser); Thread systemThread=new Thread(system, "system"); systemThread.start(); Thread appsThread=new Thread(apps, "apps"); appsThread.start(); Thread documentsThread=new Thread(documents, "documents"); documentsThread.start(); try { systemThread.join(); appsThread.join(); documentsThread.join(); } catch (Exception e) { e.printStackTrace(); } System.out.println("Terminated:"+ phaser.isTerminated()); } }