Phaser运行阶段性并发任务【在指定的文件夹和子文件夹中获得文件扩展名为.log的文件列表】
2016-07-17 22:49
483 查看
运行阶段性并发任务
Java 并发 API 提供的一个非常复杂且强大的功能是,能够使用Phaser类运行阶段性的并发任务。当某些并发任务是分成多个步骤来执行时,那么此机制是非常有用的。Phaser类提供的机制是在每个步骤的结尾同步线程,所以除非全部线程完成第一个步骤,否则线程不能开始进行第二步。相对于其他同步应用,我们必须初始化Phaser类与这次同步操作有关的任务数,我们可以通过增加或者减少来不断的改变这个数。
在这个指南,你将学习如果使用Phaser类来同步3个并发任务。这3个任务会在3个不同的文件夹和它们的子文件夹中搜索扩展名是.log并在24小时内修改过的文件。这个任务被分成3个步骤:
在指定的文件夹和子文件夹中获得文件扩展名为.log的文件列表。
过滤第一步的列表中修改超过24小时的文件。
在操控台打印结果。
在步骤1和步骤2的结尾我们要检查列表是否为空。如果为空,那么线程直接结束运行并从phaser类中淘汰。
package com.packtpub.java7.concurrency.chapter3.recipe5.task; import java.io.File; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; /** * This class search for files with an extension in a directory */ public class FileSearch implements Runnable { /** * Initial path for the search */ private String initPath; /** * Extension of the file we are searching for */ private String end; /** * List that stores the full path of the files that have the extension we are searching for */ private List<String> results; /** * Phaser to control the execution of the FileSearch objects. Their execution will be divided * in three phases * 1st: Look in the folder and its subfolders for the files with the extension * 2nd: Filter the results. We only want the files modified today * 3rd: Print the results */ private Phaser phaser; /** * Constructor of the class. Initializes its attributes * @param initPath Initial path for the search * @param end Extension of the files we are searching for * @param phaser Phaser object to control the execution */ public FileSearch(String initPath, String end, Phaser phaser) { this.initPath = initPath; this.end = end; this.phaser=phaser; results=new ArrayList<>(); } /** * Main method of the class. See the comments inside to a better description of it */ @Override public void run() { // Waits for the creation of all the FileSearch objects phaser.arriveAndAwaitAdvance(); System.out.printf("%s: Starting.\n",Thread.currentThread().getName()); // 1st Phase: Look for the files File file = new File(initPath); if (file.isDirectory()) { directoryProcess(file); } // If no results, deregister in the phaser and ends if (!checkResults()){ return; } // 2nd Phase: Filter the results filterResults(); // If no results after the filter, deregister in the phaser and ends if (!checkResults()){ return; } // 3rd Phase: Show info showInfo(); phaser.arriveAndDeregister(); System.out.printf("%s: Work completed.\n",Thread.currentThread().getName()); } /** * This method prints the final results of the search */ private void showInfo() { for (int i=0; i<results.size(); i++){ File file=new File(results.get(i)); System.out.printf("%s: %s\n",Thread.currentThread().getName(),file.getAbsolutePath()); } // Waits for the end of all the FileSearch threads that are registered in the phaser phaser.arriveAndAwaitAdvance(); } /** * This method checks if there are results after the execution of a phase. If there aren't * results, deregister the thread of the phaser. * @return true if there are results, false if not */ private boolean checkResults() { if (results.isEmpty()) { System.out.printf("%s: Phase %d: 0 results.\n",Thread.currentThread().getName(),phaser.getPhase()); System.out.printf("%s: Phase %d: End.\n",Thread.currentThread().getName(),phaser.getPhase()); // No results. Phase is completed but no more work to do. Deregister for the phaser phaser.arriveAndDeregister(); return false; } else { // There are results. Phase is completed. Wait to continue with the next phase System.out.printf("%s: Phase %d: %d results.\n",Thread.currentThread().getName(),phaser.getPhase(),results.size()); phaser.arriveAndAwaitAdvance(); return true; } } /** * Method that filter the results to delete the files modified more than a day before now */ private void filterResults() { List<String> newResults=new ArrayList<>(); long actualDate=new Date().getTime(); for (int i=0; i<results.size(); i++){ File file=new File(results.get(i)); long fileDate=file.lastModified(); if (actualDate-fileDate<TimeUnit.MILLISECONDS.convert(1,TimeUnit.DAYS)){ newResults.add(results.get(i)); } } results=newResults; } /** * Method that process a directory * * @param file * : Directory to process */ private void directoryProcess(File file) { // Get the content of the directory File list[] = file.listFiles(); if (list != null) { for (int i = 0; i < list.length; i++) { if (list[i].isDirectory()) { // If is a directory, process it directoryProcess(list[i]); } else { // If is a file, process it fileProcess(list[i]); } } } } /** * Method that process a File * * @param file * : File to process */ private void fileProcess(File file) { if (file.getName().endsWith(end)) { results.add(file.getAbsolutePath()); } } public static void main(String[] args) { // Creates a Phaser with three participants Phaser phaser=new Phaser(3); // Creates 3 FileSearch objects. Each of them search in different directory FileSearch system=new FileSearch("C:\\Windows", "log", phaser); FileSearch apps=new FileSearch("C:\\Program Files","log",phaser); FileSearch documents=new FileSearch("C:\\Documents And Settings","log",phaser); // Creates a thread to run the system FileSearch and starts it Thread systemThread=new Thread(system,"System"); systemThread.start(); // Creates a thread to run the apps FileSearch and starts it Thread appsThread=new Thread(apps,"Apps"); appsThread.start(); // Creates a thread to run the documents FileSearch and starts it Thread documentsThread=new Thread(documents,"Documents"); documentsThread.start(); try { systemThread.join(); appsThread.join(); documentsThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("Terminated: %s\n",phaser.isTerminated()); } }
结果:
Documents: Starting.
Apps: Starting.
System: Starting.
Documents: Phase 1: 0 results.
Documents: Phase 1: End.
Apps: Phase 1: 6 results.
System: Phase 1: 22 results.
Apps: Phase 2: 1 results.
System: Phase 2: 4 results.
System: C:\Windows\inf\setupapi.app.log
Apps: C:\Program Files\Intel\WiFi\UnifiedLogging\ds-debug.log
System: C:\Windows\PFRO.log
System: C:\Windows\setupact.log
System: C:\Windows\System32\catroot2\edb.log
System: Work completed.
Apps: Work completed.
Terminated: true
它是怎么工作的…
这程序开始创建的 Phaser 对象是用来在每个phase的末端控制线程的同步。Phaser的构造函数接收参与者的数量作为参数。在这里,Phaser有3个参与者。这个数向Phaser表示 Phaser改变phase之前执行 arriveAndAwaitAdvance() 方法的线程数,并叫醒正在休眠的线程。一旦Phaser被创建,我们运行3个线程分别执行3个不同的FileSearch对象。
在例子里,我们使用 Windows operating system 的路径。如果你使用的是其他操作系统,那么修改成适应你的环境的路径。
FileSearch对象的 run() 方法中的第一个指令是调用Phaser对象的 arriveAndAwaitAdvance() 方法。像之前提到的,Phaser知道我们要同步的线程的数量。当某个线程调用此方法,Phaser减少终结actual phase的线程数,并让这个线程进入休眠 直到全部其余线程结束phase。在run() 方法前面调用此方法,没有任何 FileSearch 线程可以开始他们的工作,直到全部线程被创建。
在phase 1 和 phase 2 的末端,我们检查phase 是否生成有元素的结果list,或者它没有生成结果且list为空。在第一个情况,checkResults() 方法之前提的调用 arriveAndAwaitAdvance()。在第二个情况,如果list为空,那就没有必要让线程继续了,就直接返回吧。但是你必须通知phaser,将会少一个参与者。为了这个,我们使用arriveAndDeregister()。它通知phaser线程结束了actual phase, 但是它将不会继续参见后面的phases,所以请phaser不要再等待它了。
在phase3的结尾实现了 showInfo() 方法, 调用了 phaser 的 arriveAndAwaitAdvance() 方法。这个调用,保证了全部线程在同一时间结束。当此方法结束执行,有一个调用phaser的arriveAndDeregister() 方法。这个调用,我们撤销了对phaser线程的注册,所以当全部线程结束时,phaser 有0个参与者。
最后,main() 方法等待3个线程的完成并调用phaser的 isTerminated() 方法。当phaser 有0个参与者时,它进入termination状态,此状态与此调用将会打印true到操控台。
Phaser 对象可能是在这2中状态:
Active: 当 Phaser 接受新的参与者注册,它进入这个状态,并且在每个phase的末端同步。 在此状态,Phaser像在这个指南里解释的那样工作。此状态不在Java 并发 API中。 Termination: 默认状态,当Phaser里全部的参与者都取消注册,它进入这个状态,所以这时 Phaser 有0个参与者。更具体的说,当onAdvance() 方法返回真值时,Phaser 是在这个状态里。如果你覆盖那个方法,你可以改变它的默认行为。当 Phaser 在这个状态,同步方法 arriveAndAwaitAdvance()会 立刻返回,不会做任何同步。
Phaser 类的一个显著特点是你不需要控制任何与phaser相关的方法的异常。不像其他同步应用,线程们在phaser休眠不会响应任何中断也不会抛出 InterruptedException 异常。只有一个异常会在下面的‘更多’里解释。
本文是《 Java 7 Concurrency Cookbook 》的第三章, 作者: Javier Fernández González
转自:http://ifeve.com/thread-synchronization-utilities-6-2/
相关文章推荐
- Android Manifest 用法
- 什么是 GraphQL?
- Spark RDD API详解(一) Map和Reduce
- Spring Boot 开发微服务
- lwn拾遗:[sn3218 led drivers]-api解释-1
- 页面元素查找之Selectors API
- php7 扩展类的写法[2]
- php7 类的方法传参[3]
- php7 读取php.ini[4]
- 一个小型js框架myJSFrame附API使用帮助
- C#线程间不能调用剪切板的解决方法
- C#、ASP.NET通用扩展工具类之TypeParse
- Lua编程示例(二):面向对象、metatable对表进行扩展
- 详细分析交换机、路由器、集线器的区别和联系
- PowerShell打开或关闭光驱
- Windows Powershell扩展类型系统
- 批处理的api WMIC学习体会有感第1/2页
- 批处理 API实现文件下载的代码第1/2页
- Lua教程(十七):C API简介
- 强制删除工具 xdelbox xdelbox1.5正式版下载