Atitit WatchService 使用和不能监控抓取到的解决
原因是生成速度太快,但处理速度慢,导致许多event 忽视了..
How to solu??? asyn to process event then ok..
C:\Users\Administrator\Documents\l1 code t1\filemonitor.java.
package com.censoft.common;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import com.censoft.WatchFilePathTask;
import com.censoft.service.impl.FilesServiceImpl;
public class filemonitor {
static org.apache.log4j.Logger logger =Logger.getLogger(filemonitor.class);
public static void main(String[] args) throws InterruptedException,
IOException {
logger.info("TTT");
String filePath = "d:\\temp\\filenew";
watchFileMethod_threadpool(filePath, new Consumer() {
public void accept(String fname) {
// asve to db
}
});
}
public static ExecutorService ExecutorService1_theardpool = Executors
.newFixedThreadPool(200);
/**
* 带线程池的公共监测方法
* @param filePath
* @param customProcess
* @throws IOException
* @throws InterruptedException
*/
public static void watchFileMethod_threadpool(String filePath,
final Consumer customProcess) throws IOException,
InterruptedException {
watchFileMethod(filePath, new Consumer() {
public void accept(final String fname) {
System.out.println(fname);
// .......
ExecutorService1_theardpool.submit(new Runnable() {
public void run() {
// .......
//
customProcess.accept(fname);
}
}); // end sumbit
}
});
}
/**
* 监测目录文件夹的变动
* @param filePath
* @param customProcess
* @throws IOException
* @throws InterruptedException
*/
private static void watchFileMethod(String filePath,
Consumer customProcess) throws IOException,
InterruptedException {
Log log = LogFactory.getLog(WatchFilePathTask.class);
// 获取监控服务
// 可同时获取多个监控服务
final WatchService watchService = FileSystems.getDefault()
.newWatchService();
log.debug("获取监控服务" + watchService);
FileUtils.forceMkdir(new File(filePath));
Path path = FileSystems.getDefault().getPath(filePath);
log.debug("@@@:Path:" + path);
// BasicFileAttributes
// 处理下级多层目录
Files.walkFileTree(path, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir,
BasicFileAttributes attrs) throws IOException {
dir.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE);
return FileVisitResult.CONTINUE;
}
});
// 注册监控服务,监控新增事件
// 将指定的监控器注册给Path对象所代表的文件节点,register方法返回的就是节点的监控池
/**
* p.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,
* StandardWatchEventKinds.ENTRY_DELETE,
* StandardWatchEventKinds.ENTRY_CREATE);
*/
WatchKey WatchKey1 = path.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE);
while (true) {
// 尝试获取下一个变化信息,如果没有则一直等待
// 长时间一直监控需要用take,如果是指定时间监控则用poll
WatchKey1 = watchService.take();
for (WatchEvent<?> WatchEvent1 : WatchKey1.pollEvents()) { // 获取事件列表
// System.out.println("["+path+"/"+event.context()+"]文件发生了["+event.kind()+"]事件");
// 检查文件名是否符合要求
Path curdir1 = (Path) WatchKey1.watchable();
processEvent_asyn(customProcess, watchService, WatchEvent1, curdir1);
}
// 重置其关联的监控器
WatchKey1.reset();
}
}
private static void processEvent_asyn(final Consumer customProcess,
final WatchService watchService, final WatchEvent watchEvent1, final Path curdir1) {
new Thread(new Runnable() {
public void run() {
try {
processEvent(customProcess, watchService, watchEvent1, curdir1);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
}
private static void processEvent(Consumer customProcess,
final WatchService watchService, WatchEvent<?> WatchEvent1,
Path curdir1) throws IOException {
// 获取目录下新增的文件名
//
String fileName = WatchEvent1.context().toString();
String filePath2 = curdir1.toFile().getAbsolutePath()
.toString()
+ File.separator + fileName;
// log.info("import filePath:"+filePath2);
customProcess.accept(filePath2);
// 处理新目录
Path curdir =curdir1;
Path Path1_newDir = curdir.resolve(fileName);
logger.info( "watchFileMethodlog:"+Path1_newDir);
if (Files.isDirectory(Path1_newDir)) {
logger.info( "watchFileMethodlog is dir:"+Path1_newDir);
Path1_newDir.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE);
// 处理下级多层目录
Files.walkFileTree(Path1_newDir, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir,
BasicFileAttributes attrs) throws IOException {
dir.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE);
return FileVisitResult.CONTINUE;
}
});
}
}
}