PathNode(Path)
StandardWatchEventKind(WatchEvent)
Watchable(WatchKey WatchService WatchEvent)
WatchKey(PathNode WatchEvent WatchService)
WatchService(WatchKey Path)
WatchEvent
FileSystem(WatchService)
Path>Watchable
WatchEvent
StandardWatchEventKind(ENTRY_CREATE; ENTRY_DELETE; ENTRY_MODIFY; OVERFLOW)
WatchService WatchKey
WatchService
1 register 注册监视目录 ,生成watchedKey(遍历子目录,每个文件都注册一个事件),添加到watchedKeys
1.1
public WatchKey register(Path root, boolean fireCreatedEventOnIndex, WatchEvent.Kind<?>... events) { if (events == null || events.length == 0) throw new UnsupportedOperationException("null events"); if (this.service.isShutdown()) throw new IllegalStateException("服务已经关闭"); if (!root.exists()) throw new IllegalArgumentException("监视的目录不存在"); WatchKey key = new WatchKey(root, this, fireCreatedEventOnIndex, events); resetKey(key); return key; }
1.1.1 WatchKey
public WatchKey(final Path path, final WatchService watcher, boolean fireCreatedEventOnIndex, WatchEvent.Kind<?>... events) { valid = true; this.watcher = watcher; // 建立内存索引 this.root = new PathNode(path, true); if (events != null) { for (WatchEvent.Kind<?> event : events) { filterSet.add(event); } } LinkedList<WatchEvent<?>> changedEvents = new LinkedList<WatchEvent<?>>(); index(this.root, fireCreatedEventOnIndex, changedEvents); this.changedEvents = changedEvents; }
1.1.1.1 WatchKey 为每个目录下子文件建立一个ENTRY_CREATE 监听事件
private void index(PathNode node, boolean fireCreatedEventOnIndex, LinkedList<WatchEvent<?>> changedEvents) { File file = node.getPath().getFile(); if (!file.isDirectory()) { return; } File[] subFiles = file.listFiles(); if (subFiles != null) { for (File subFile : subFiles) { PathNode subNode = new PathNode(new Path(subFile), false); if (fireCreatedEventOnIndex) { changedEvents.add(new WatchEvent<Path>(StandardWatchEventKind.ENTRY_CREATE, 1, subNode.getPath())); } node.addChild(subNode); if (subNode.getPath().isDirectory()) { index(subNode, fireCreatedEventOnIndex, changedEvents); } } } }
2 启动的一个线程,定时扫描watchedKeys,调用watchedKey.check(),
如果有变化,watchedKeys 删掉watchedKey ,changedKeys add watchedKey
public WatchService(long checkInterval) { service = Executors.newSingleThreadScheduledExecutor(); service.scheduleAtFixedRate(new CheckThread(), checkInterval, checkInterval, TimeUnit.MILLISECONDS); }
private final class CheckThread implements Runnable { public void run() { check(); } } /** * 主动check */ public void check() { synchronized (this) { Iterator<WatchKey> it = watchedKeys.iterator(); while (it.hasNext()) { WatchKey key = it.next(); try { if (key.check()) { changedKeys.add(key); it.remove(); } } catch (Throwable t) { log.error("检测WatchKey异常,key=" + key, t); } } } }
2.1 key.check()
boolean check() { if (this.changedEvents != null && this.changedEvents.size() > 0) return true; if (!this.valid) return false; List<WatchEvent<?>> list = new LinkedList<WatchEvent<?>>(); if (check(root, list)) { this.changedEvents = list; return true; } else { return false; } }
2.1.1
private boolean check(PathNode node, List<WatchEvent<?>> changedEvents) { Path nodePath = node.getPath(); File nodeNewFile = new File(nodePath.getAbsolutePath()); if (nodePath != null) { if (node.isRoot()) { if (!nodeNewFile.exists()) return fireOnRootDeleted(changedEvents, nodeNewFile);//触发删除事件,添加到当前watchedKey的changedEvents else { return checkNodeChildren(node, changedEvents, nodeNewFile); } } else { return checkNodeChildren(node, changedEvents, nodeNewFile); } } else throw new IllegalStateException("PathNode没有path"); }
2.1.1.1 监听新增事件 修改事件,若有讲事件添加到当前watchedKey的changedEvents
private boolean checkNodeChildren(PathNode node, List<WatchEvent<?>> changedEvents, File nodeNewFile) { boolean changed = false; Iterator<PathNode> it = node.getChildren().iterator(); // 用于判断是否有新增文件或者目录的现有名称集合 Set<String> childNameSet = new HashSet<String>(); while (it.hasNext()) { PathNode child = it.next(); Path childPath = child.getPath(); childNameSet.add(childPath.getName()); File childNewFile = new File(childPath.getAbsolutePath()); // 1、判断文件是否还存在 if (!childNewFile.exists() && filterSet.contains(StandardWatchEventKind.ENTRY_DELETE)) { changed = true; changedEvents.add(new WatchEvent<Path>(StandardWatchEventKind.ENTRY_DELETE, 1, childPath)); it.remove();// 移除节点 } // 2、如果是文件,判断是否被修改 if (childPath.isFile()) { if (checkFile(changedEvents, child, childNewFile) && !changed) { changed = true; } } // 3、递归检测目录 if (childPath.isDirectory()) { if (check(child, changedEvents) && !changed) { changed = true; } } } // 查看是否有新增文件 File[] newChildFiles = nodeNewFile.listFiles(); if(newChildFiles!=null) for (File newChildFile : newChildFiles) { if (!childNameSet.contains(newChildFile.getName()) && filterSet.contains(StandardWatchEventKind.ENTRY_CREATE)) { changed = true; Path newChildPath = new Path(newChildFile); changedEvents.add(new WatchEvent<Path>(StandardWatchEventKind.ENTRY_CREATE, 1, newChildPath)); PathNode newSubNode = new PathNode(newChildPath, false); node.addChild(newSubNode);// 新增子节点 // 如果是目录,递归调用 if (newChildFile.isDirectory()) { checkNodeChildren(newSubNode, changedEvents, newChildFile); } } } return changed; }
3 如何使用
private void startCheckLocalDir(final String filePath) { final WatchService watcher = FileSystem.getDefault().newWatchService(); Path path = new Path(new File(filePath)); // 注册事件 watcher.register(path, true, StandardWatchEventKind.ENTRY_CREATE, StandardWatchEventKind.ENTRY_DELETE, StandardWatchEventKind.ENTRY_MODIFY); // 第一次运行,主动check checkAtFirst(watcher); singleExecutor.execute(new Runnable() { public void run() { log.debug(">>>>>>已经开始监控目录<<<<<<"); // 无限循环等待事件 while (isRun) { // 凭证 WatchKey key; try { key = watcher.take(); } catch (InterruptedException x) { continue; } // reset,如果无效,跳出循环,无效可能是监听的目录被删除 if (!processEvents(key)) { log.error("reset unvalid,监控服务失效"); break; } } log.debug(">>>>>>退出监控目录<<<<<<"); watcher.close(); } }); }
3.1 处理触发的事件,并继续监听
/** * 处理触发的事件 * * @param key * @return */ @SuppressWarnings({ "unchecked" }) private boolean processEvents(WatchKey key) { /** * 获取事件集合 */ for (WatchEvent<?> event : key.pollEvents()) { // 事件的类型 // WatchEvent.Kind<?> kind = event.kind(); // 通过context方法得到发生事件的path WatchEvent<Path> ev = (WatchEvent<Path>) event; Path eventPath = ev.context(); String realPath = eventPath.getAbsolutePath(); if (ev.kind() == StandardWatchEventKind.ENTRY_CREATE || ev.kind() == StandardWatchEventKind.ENTRY_MODIFY) { String grandpaDir = null; try { grandpaDir = FileUtils.getGrandpaDir(realPath); } catch (Exception e1) { } if (!Constants.BASE_DIR.equals(grandpaDir)) { log.error("无效的文件进入监控目录: " + realPath); continue; } existFiles.put(realPath, System.currentTimeMillis()); if (log.isInfoEnabled()) { log.info(realPath + "文件被添加或更新"); } } else if (ev.kind() == StandardWatchEventKind.ENTRY_DELETE) { String grandpaDir = null; try { grandpaDir = FileUtils.getGrandpaDir(realPath); } catch (Exception e1) { } if (Constants.BASE_DIR.equals(grandpaDir)) { // 删除的是文件 existFiles.remove(realPath); if (log.isInfoEnabled()) { log.info(realPath + "文件被被删除"); } } else { // 删除的是目录 Set<String> keySet = new HashSet<String>(existFiles.keySet()); for (String filePath : keySet) { if (filePath.startsWith(realPath)) { existFiles.remove(filePath); if (log.isInfoEnabled()) { log.info(filePath + "文件被删除"); } } } } } } return key.reset();//WatchService 继续监听 }