One of the most complex and powerful functionalities offered by the Java concurrency API is the ability to execute concurrent-phased tasks using the Phaser class. This mechanism is useful when we have some concurrent tasks divided into steps. The Phaser class provides us with the mechanism to synchronize the threads at the end of each step, so no thread starts its second step until all the threads have finished the first one.
As with other synchronization utilities, we have to initialize the Phaser class with the number of tasks that participate in the synchronization operation, but we can dynamically modify this number by increasing or decreasing it.
In this recipe, you will learn how to use the Phaser class to synchronize three concurrent tasks. The three tasks look for files with the extension .log modified in the last 24 hours in three different folders and their subfolders. This task is divided into three steps:
- Get a list of the files with the extension .log in the assigned folder and its subfolders.
- Filter the list created in the first step by deleting the files modified more than 24 hours ago.
- Print the results in the console.
At the end of the steps 1 and 2 we check if the list has any elements or not. If it hasn't any element, the thread ends its execution and is eliminated from the the phaser class.
1. Create a class named FileSearch and specify that it implements the Runnable interface. This class implements the operation of searching for files with a determined extension modified in the last 24 hours in a folder and its subfolders.
package com.packtpub.java7.concurrency.chapter3.recipe5.task; import java.io.File; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; /** * This class search for files with an extension in a directory */ public class FileSearch implements Runnable { /** * Initial path for the search */ private String initPath; /** * Extension of the file we are searching for */ private String end; /** * List that stores the full path of the files that have the extension we are searching for */ private List<String> results; /** * Phaser to control the execution of the FileSearch objects. Their execution will be divided * in three phases * 1st: Look in the folder and its subfolders for the files with the extension * 2nd: Filter the results. We only want the files modified today * 3rd: Print the results */ private Phaser phaser; /** * Constructor of the class. Initializes its attributes * @param initPath Initial path for the search * @param end Extension of the files we are searching for * @param phaser Phaser object to control the execution */ public FileSearch(String initPath, String end, Phaser phaser) { this.initPath = initPath; this.end = end; this.phaser=phaser; results=new ArrayList<>(); } /** * Main method of the class. See the comments inside to a better description of it */ @Override public void run() { // Waits for the creation of all the FileSearch objects phaser.arriveAndAwaitAdvance(); System.out.printf("%s: Starting. ",Thread.currentThread().getName()); // 1st Phase: Look for the files File file = new File(initPath); if (file.isDirectory()) { directoryProcess(file); } // If no results, deregister in the phaser and ends if (!checkResults()){ return; } // 2nd Phase: Filter the results filterResults(); // If no results after the filter, deregister in the phaser and ends if (!checkResults()){ return; } // 3rd Phase: Show info showInfo(); phaser.arriveAndDeregister(); System.out.printf("%s: Work completed. ",Thread.currentThread().getName()); } /** * This method prints the final results of the search */ private void showInfo() { for (int i=0; i<results.size(); i++){ File file=new File(results.get(i)); System.out.printf("%s: %s ",Thread.currentThread().getName(),file.getAbsolutePath()); } // Waits for the end of all the FileSearch threads that are registered in the phaser phaser.arriveAndAwaitAdvance(); } /** * This method checks if there are results after the execution of a phase. If there aren't * results, deregister the thread of the phaser. * @return true if there are results, false if not */ private boolean checkResults() { if (results.isEmpty()) { System.out.printf("%s: Phase %d: 0 results. ",Thread.currentThread().getName(),phaser.getPhase()); System.out.printf("%s: Phase %d: End. ",Thread.currentThread().getName(),phaser.getPhase()); // No results. Phase is completed but no more work to do. Deregister for the phaser phaser.arriveAndDeregister(); return false; } else { // There are results. Phase is completed. Wait to continue with the next phase System.out.printf("%s: Phase %d: %d results. ",Thread.currentThread().getName(),phaser.getPhase(),results.size()); phaser.arriveAndAwaitAdvance(); return true; } } /** * Method that filter the results to delete the files modified more than a day before now */ private void filterResults() { List<String> newResults=new ArrayList<>(); long actualDate=new Date().getTime(); for (int i=0; i<results.size(); i++){ File file=new File(results.get(i)); long fileDate=file.lastModified(); if (actualDate-fileDate<TimeUnit.MILLISECONDS.convert(1,TimeUnit.DAYS)){ newResults.add(results.get(i)); } } results=newResults; } /** * Method that process a directory * * @param file * : Directory to process */ private void directoryProcess(File file) { // Get the content of the directory File list[] = file.listFiles(); if (list != null) { for (int i = 0; i < list.length; i++) { if (list[i].isDirectory()) { // If is a directory, process it directoryProcess(list[i]); } else { // If is a file, process it fileProcess(list[i]); } } } } /** * Method that process a File * * @param file * : File to process */ private void fileProcess(File file) { if (file.getName().endsWith(end)) { results.add(file.getAbsolutePath()); } } }
2. Implement the main class of the example by creating a class named Main.
package com.packtpub.java7.concurrency.chapter3.recipe5.core; import java.util.concurrent.Phaser; import com.packtpub.java7.concurrency.chapter3.recipe5.task.FileSearch; /** * Main class of the example * */ public class Main { /** * Main method of the example * @param args */ public static void main(String[] args) { // Creates a Phaser with three participants Phaser phaser=new Phaser(3); // Creates 3 FileSearch objects. Each of them search in different directory FileSearch system=new FileSearch("C:\Windows", "log", phaser); FileSearch apps=new FileSearch("C:\Program Files","log",phaser); FileSearch documents=new FileSearch("C:\Documents And Settings","log",phaser); // Creates a thread to run the system FileSearch and starts it Thread systemThread=new Thread(system,"System"); systemThread.start(); // Creates a thread to run the apps FileSearch and starts it Thread appsThread=new Thread(apps,"Apps"); appsThread.start(); // Creates a thread to run the documents FileSearch and starts it Thread documentsThread=new Thread(documents,"Documents"); documentsThread.start(); try { systemThread.join(); appsThread.join(); documentsThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("Terminated: %s ",phaser.isTerminated()); } }
The program starts creating a Phaser object that will control the synchronization of the threads at the end of each phase. The constructor of Phaser receives the number of participants as a parameter. In our case, Phaser has three participants. This number indicates to Phaser the number of threads that have to execute an arriveAndAwaitAdvance() method before Phaser changes the phase and wakes up the threads that were sleeping.
Once Phaser has been created, we launch three threads that execute three different FileSearch objects.
The first instruction in the run() method of this FileSearch object is a call to the arriveAndAwaitAdvance() method of the Phaser object. As we mentioned earlier, the Phaser knows the number of threads that we want to synchronize. When a thread calls this method, Phaser decreases the number of threads that have to finalize the actual phase and puts this thread to sleep until all the remaining threads finish this phase. Calling this method at the beginning of the run() method makes none of the FileSearch threads begin their job until all the threads have been created.
At the end of phase one and phase two, we check if the phase has generated results and the list with the results has elements, or otherwise the phase hasn't generated results and the list is empty. In the first case, the checkResults() method calls arriveAndAwaitAdvance() as explained earlier. In the second case, if the list is empty, there's no point in the thread continuing with its execution, so it returns. But you have to notify the phaser that there will be one less participant. For this, we used arriveAndDeregister(). This notifies the phaser that this thread has finished the actual phase, but it won't participate in the future phases, so the phaser won't have to wait for it to continue.
At the end of the phase three implemented in the showInfo() method, there is a call to the arriveAndAwaitAdvance() method of the phaser. With this call, we guarantee that all the threads finish at the same time. When this method ends its execution, there is a call to the arriveAndDeregister() method of the phaser. With this call, we deregister the threads of the phaser as we explained before, so when all the threads finish, the phaser will have zero participants.
Finally, the main() method waits for the completion of the three threads and calls the isTerminated() method of the phaser. When a phaser has zero participants, it enters the so called termination state and this method returns true. As we deregister all the threads of the phaser, it will be in the termination state and this call will print true to the console.
A Phaser object can be in two states:
Active: Phaser enters this state when it accepts the registration of new participants and its synchronization at the end of each phase. In this state, Phaser works as it has been explained in this recipe. This state is not mentioned in the Java concurrency API.
Termination: By default, Phaser enters in this state when all the participants in Phaser have been deregistered, so Phaser has zero participants. More in detail, Phaser is in the termination state when the method onAdvance() returns the true value. If you override that method, you can change the default behavior. When Phaser is on this state, the synchronization method arriveAndAwaitAdvance() returns immediately without doing any synchronization operation.
A notable feature of the Phaser class is that you haven't had to control any exception from the methods related with the phaser. Unlike other synchronization utilities, threads that are sleeping in a phaser don't respond to interruption events and don't throw an InterruptedException exception.
The Phaser class provides other methods related to the change of phase. These methods are as follows:
arrive(): This method notifies the phaser that one participant has finished the actual phase, but it should not wait for the rest of the participants to continue with its execution. Be careful with the utilization of this method, because it doesn't synchronize with other threads.
awaitAdvance(int phase): This method puts the current thread to sleep until all the participants of the phaser have finished the current phase of the phaser, if the number we pass as the parameter is equal to the actual phase of the phaser. If the parameter and the actual phase of the phaser aren't equal, the method returns immediately.
awaitAdvanceInterruptibly(int phaser): This method is equal to the method explained earlier, but it throws an InterruptedException exception if the thread that is sleeping in this method is interrupted.
Registering participants in the Phaser
When you create a Phaser object, you indicate how many participants will have that phaser. But the Phaser class has two methods to increment the number of participants of a phaser. These methods are as follows:
- register(): This method adds a new participant to Phaser. This new participant will be considered as unarrived to the actual phase.
- bulkRegister(int Parties): This method adds the specified number of participants to the phaser. These new participants will be considered as unarrived to the actual phase.
The only method provided by the Phaser class to decrement the number of participants is the arriveAndDeregister() method that notifies the phaser that the thread has finished the actual phase, and it doesn't want to continue with the phased operation.
Forcing the termination of a Phaser
When a phaser has zero participants, it enters a state denoted by Termination. The Phaser class provides forceTermination() to change the status of the phaser and makes it enter in the Termination state independently of the number of participants registered in the phaser. This mechanism may be useful when one of the participants has an error situation, to force the termination of the phaser.
When a phaser is in the Termination state, the awaitAdvance() and arriveAndAwaitAdvance() methods immediately return a negative number, instead of a positive one that returns normally. If you know that your phaser could be terminated, you should verify the return value of those methods to know if the phaser has been terminated.