zoukankan      html  css  js  c++  java
  • Atitit WatchService 使用和不能监控抓取到的解决 原因是生成速度太快,但处理速度慢,导致许多event 忽视了.. How to solu??? asyn to process

    Atitit WatchService 使用和不能监控抓取到的解决

     

    原因是生成速度太快,但处理速度慢,导致许多event 忽视了..

    How to solu???   asyn to process event then ok..

     

    C:\Users\Administrator\Documents\l1 code t1\filemonitor.java.

     

     

    package com.censoft.common;

     

    import java.io.File;

    import java.io.FileFilter;

    import java.io.IOException;

    import java.nio.file.FileSystems;

    import java.nio.file.FileVisitResult;

    import java.nio.file.Files;

    import java.nio.file.Path;

    import java.nio.file.SimpleFileVisitor;

    import java.nio.file.StandardWatchEventKinds;

    import java.nio.file.WatchEvent;

    import java.nio.file.WatchKey;

    import java.nio.file.WatchService;

    import java.nio.file.attribute.BasicFileAttributes;

    import java.util.concurrent.ExecutorService;

    import java.util.concurrent.Executors;

     

    import org.apache.commons.io.FileUtils;

    import org.apache.commons.logging.Log;

    import org.apache.commons.logging.LogFactory;

    import org.apache.log4j.Logger;

    import org.joda.time.DateTime;

     

    import com.censoft.WatchFilePathTask;

    import com.censoft.service.impl.FilesServiceImpl;

     

    public class filemonitor {

    static org.apache.log4j.Logger logger =Logger.getLogger(filemonitor.class);

    public static void main(String[] args) throws InterruptedException,

    IOException {

    logger.info("TTT");

    String filePath = "d:\\temp\\filenew";

     

     

     

    watchFileMethod_threadpool(filePath, new Consumer() {

     

    public void accept(String fname) {

    // asve to db

     

    }

    });

     

    }

     

    public static ExecutorService ExecutorService1_theardpool = Executors

    .newFixedThreadPool(200);

     

    /**

     * 带线程池的公共监测方法

     * @param filePath

     * @param customProcess

     * @throws IOException

     * @throws InterruptedException

     */

    public static void watchFileMethod_threadpool(String filePath,

    final Consumer customProcess) throws IOException,

    InterruptedException {

     

    watchFileMethod(filePath, new Consumer() {

     

    public void accept(final String fname) {

    System.out.println(fname);

     

    // .......

    ExecutorService1_theardpool.submit(new Runnable() {

     

    public void run() {

    // .......

    //

    customProcess.accept(fname);

    }

     

    }); // end sumbit

     

    }

    });

    }

     

     

    /**

     * 监测目录文件夹的变动

     * @param filePath

     * @param customProcess

     * @throws IOException

     * @throws InterruptedException

     */

    private static void watchFileMethod(String filePath,

    Consumer customProcess) throws IOException,

    InterruptedException {

    Log log = LogFactory.getLog(WatchFilePathTask.class);

    // 获取监控服务

    // 可同时获取多个监控服务

    final WatchService watchService = FileSystems.getDefault()

    .newWatchService();

    log.debug("获取监控服务" + watchService);

     

    FileUtils.forceMkdir(new File(filePath));

     

    Path path = FileSystems.getDefault().getPath(filePath);

    log.debug("@@@:Path:" + path);

    // BasicFileAttributes

    // 处理下级多层目录

    Files.walkFileTree(path, new SimpleFileVisitor<Path>() {

     

    @Override

    public FileVisitResult preVisitDirectory(Path dir,

    BasicFileAttributes attrs) throws IOException {

    dir.register(watchService,

    StandardWatchEventKinds.ENTRY_CREATE);

    return FileVisitResult.CONTINUE;

     

    }

    });

     

    // 注册监控服务,监控新增事件

    // 将指定的监控器注册给Path对象所代表的文件节点,register方法返回的就是节点的监控池

    /**

     * p.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY,

     * StandardWatchEventKinds.ENTRY_DELETE,

     * StandardWatchEventKinds.ENTRY_CREATE);

     */

    WatchKey WatchKey1 = path.register(watchService,

    StandardWatchEventKinds.ENTRY_CREATE);

    while (true) {

    // 尝试获取下一个变化信息,如果没有则一直等待

    // 长时间一直监控需要用take,如果是指定时间监控则用poll

    WatchKey1 = watchService.take();

    for (WatchEvent<?> WatchEvent1 : WatchKey1.pollEvents()) { // 获取事件列表

    // System.out.println("["+path+"/"+event.context()+"]文件发生了["+event.kind()+"]事件");

     

    // 检查文件名是否符合要求

    Path curdir1 = (Path) WatchKey1.watchable();

    processEvent_asyn(customProcess, watchService, WatchEvent1, curdir1);

     

    }

    // 重置其关联的监控器

    WatchKey1.reset();

     

    }

    }

     

     

    private static void processEvent_asyn(final Consumer customProcess,

    final WatchService watchService, final WatchEvent  watchEvent1, final Path curdir1) {

    new Thread(new Runnable() {

     

    public void run() {

    try {

    processEvent(customProcess, watchService, watchEvent1, curdir1);

    } catch (IOException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }

     

    }

    }).start();

     

    }

     

     

    private static void processEvent(Consumer customProcess,

    final WatchService watchService, WatchEvent<?> WatchEvent1,

    Path curdir1) throws IOException {

    // 获取目录下新增的文件名

    //

    String fileName = WatchEvent1.context().toString();

    String filePath2 = curdir1.toFile().getAbsolutePath()

    .toString()

    + File.separator + fileName;

    // log.info("import filePath:"+filePath2);

     

    customProcess.accept(filePath2);

     

    // 处理新目录

    Path curdir =curdir1;

    Path Path1_newDir = curdir.resolve(fileName);

    logger.info( "watchFileMethodlog:"+Path1_newDir);

    if (Files.isDirectory(Path1_newDir)) {

    logger.info( "watchFileMethodlog is dir:"+Path1_newDir);

    Path1_newDir.register(watchService,

    StandardWatchEventKinds.ENTRY_CREATE);

     

    // 处理下级多层目录

    Files.walkFileTree(Path1_newDir, new SimpleFileVisitor<Path>() {

     

    @Override

    public FileVisitResult preVisitDirectory(Path dir,

    BasicFileAttributes attrs) throws IOException {

    dir.register(watchService,

    StandardWatchEventKinds.ENTRY_CREATE);

    return FileVisitResult.CONTINUE;

     

    }

    });

    }

    }

     

    }

     

  • 相关阅读:
    我理解的朴素贝叶斯模型
    P2P贷款全攻略,贷前、贷中、贷后工作事项解析
    Jupyter Notebook 快速入门
    R语言|数据特征分析
    R语言︱处理缺失数据&&异常值检验、离群点分析、异常值处理
    mysql explain执行计划详解
    R语言中的回归诊断-- car包
    一行代码搞定 R 语言模型输出!(使用 stargazer 包)
    基于R语言的时间序列指数模型
    基于R语言的ARIMA模型
  • 原文地址:https://www.cnblogs.com/attilax/p/15197382.html
Copyright © 2011-2022 走看看