zoukankan      html  css  js  c++  java
  • java利用WatchService实时监控某个目录下的文件变化并按行解析

    转载自:http://www.cnblogs.com/zishengY/p/6958564.html

    首先说下需求:通过ftp上传约定格式的文件到服务器指定目录下,应用程序能实时监控该目录下文件变化,如果上传的文件格式符合要求,将将按照每一行读取解析再写入到数据库,解析完之后再将文件改名。

    一. 一开始的思路

      设置一个定时任务,每隔一分钟读取下指定目录下的文件变化,如果有满足格式的文件,就进行解析。

         这种方式很繁琐,而且效率低,效率都消耗在了遍历、保存状态、对比状态上了! 而且无法利用OS的很多功能。

    二. WatchService介绍

    1、 该类的对象就是操作系统原生的文件系统监控器!我们都知道OS自己的文件系统监控器可以监控系统上所有文件的变化,这种监控是无需遍历、无需比较的,是一种基于信号收发的监控,因此效率一定是最高的;现在Java对其进行了包装,可以直接在Java程序    中使用OS的文件系统监控器了;

    2、 获取当前OS平台下的文件系统监控器:

             i. WatchService watcher = FileSystems.getDefault().newWatchService();

             ii. 从FileSystems这个类名就可以看出这肯定是属于OS平台文件系统的,接下来可以看出这一连串方法直接可以得到一个文件监控器;

      这里暂时不用深入理解这串方法的具体含义,先知道怎么用就行了;

    3、 我们都知道,操作系统上可以同时开启多个监控器,因此在Java程序中也不例外,上面的代码只是获得了一个监控器,你还可以用同样的代码同时获得多个监控器;

    4、 监控器其实就是一个后台线程,在后台监控文件变化所发出的信号,这里通过上述代码获得的监控器还只是一个刚刚初始化的线程,连就绪状态都没有进入,只是初始化而已;

    三、实现过程

      其实就是在初始化的时候创建一个线程,然后用watchService实时监控该目录下文件变化,如果有满足条件文件加进来,就按照约定的格式解析文件再写入数据库,详细步骤如下!

    1、web.xml监听器配置文件监控监听器 

    <?xml version="1.0" encoding="UTF-8"?>
    <web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://java.sun.com/xml/ns/javaee 
        http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd">
    
        <context-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:root-context.xml</param-value>
        </context-param>
    
        <filter>
            <filter-name>CharacterEncodingFilter</filter-name>
            <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
            <init-param>
                <param-name>encoding</param-name>
                <param-value>UTF-8</param-value>
            </init-param>
            <init-param>
                <param-name>forceEncoding</param-name>
                <param-value>true</param-value>
            </init-param>
        </filter>
        <filter-mapping>
            <filter-name>CharacterEncodingFilter</filter-name>
            <url-pattern>/*</url-pattern>
        </filter-mapping>
    
        <filter>
            <filter-name>sitemesh</filter-name>
            <filter-class>com.opensymphony.sitemesh.webapp.SiteMeshFilter</filter-class>
        </filter>
    
        <filter-mapping>
            <filter-name>sitemesh</filter-name>
            <url-pattern>/*</url-pattern>
        </filter-mapping>
    
        <servlet>
            <servlet-name>appServlet</servlet-name>
            <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
            <init-param>
                <param-name>contextConfigLocation</param-name>
                <param-value>classpath:servlet-context.xml</param-value>
            </init-param>
            <load-on-startup>1</load-on-startup>
        </servlet>
    
        <servlet-mapping>
            <servlet-name>appServlet</servlet-name>
            <url-pattern>/</url-pattern>
        </servlet-mapping>
        
        <!-- 配置spring监听器 -->
        <listener>
            <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
        </listener>
        <!-- 配置监控文件变化监听器 -->
        <listener>
            <listener-class>com.zealer.ad.listener.ThreadStartUpListenser</listener-class>
        </listener>
        <listener>
            <listener-class>com.zealer.ad.listener.SessionLifecycleListener</listener-class>
        </listener>
        
        
        <jsp-config>
          <taglib>
           <taglib-uri>/tag</taglib-uri>
           <taglib-location>/WEB-INF/tag/tag.tld</taglib-location>
          </taglib>
        </jsp-config>
    
        <welcome-file-list>
            <welcome-file>index.jsp</welcome-file>
        </welcome-file-list>
        
        <session-config>
            <session-timeout>45</session-timeout>
        </session-config>
    </web-app>
    View Code

    2、编写一个ThreadStartUpListenser类,实现ServletContextListener,tomcat启动时创建后台线程

      ThreadStartUpListenser.java

    package com.zealer.ad.listener;
    
    import javax.servlet.ServletContextEvent;
    import javax.servlet.ServletContextListener;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.springframework.stereotype.Component;
    
    import com.zealer.ad.task.WatchFilePathTask;
    
    @Component
    public class ThreadStartUpListenser implements ServletContextListener
    {
        private static WatchFilePathTask r = new WatchFilePathTask();
    
        private Log log = LogFactory.getLog(ThreadStartUpListenser.class);
        
        /*
         * tomcat启动的时候创建一个线程
         * */
        @Override
        public void contextInitialized(ServletContextEvent paramServletContextEvent)
        {
            r.start();
            log.info("ImportUserFromFileTask is started!");
        }
        
        /*
         * tomcat关闭的时候销毁这个线程
         * */
        @Override
        public void contextDestroyed(ServletContextEvent paramServletContextEvent)
        {
            r.interrupt();
        }
    
    }
    View Code

    3、创建指定目录文件变化监控类

        WatchFilePathTask.java

    package com.zealer.ad.task;
    
    import java.io.File;
    import java.io.FileFilter;
    import java.io.IOException;
    import java.nio.file.FileSystems;
    import java.nio.file.Path;
    import java.nio.file.StandardWatchEventKinds;
    import java.nio.file.WatchEvent;
    import java.nio.file.WatchKey;
    import java.nio.file.WatchService;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.joda.time.DateTime;
    
    import com.zealer.ad.util.ConfigUtils;
    import com.zealer.ad.util.SpringUtils;
    /**
     * 指定目录文件变化监控类
     * @author cancer
     *
     */
    public class WatchFilePathTask extends Thread
    {
        private Log log = LogFactory.getLog(WatchFilePathTask.class);
        
        private static final String filePath = ConfigUtils.getInstance().getValue("userfile_path");
        
        private WatchService watchService;
        
        @Override
        public void run()
        {
            try
            {
                //获取监控服务
                watchService = FileSystems.getDefault().newWatchService();
                log.debug("获取监控服务"+watchService);
                Path path = FileSystems.getDefault().getPath(filePath);
                log.debug("@@@:Path:"+path);
                
                final String todayFormat = DateTime.now().toString("yyyyMMdd");
                
                File existFiles = new File(filePath);
                //启动时检查是否有未解析的符合要求的文件
                if(existFiles.isDirectory())
                {
                    File[] matchFile = existFiles.listFiles(new FileFilter()
                    {
                        
                        @Override
                        public boolean accept(File pathname)
                        {
                            if((todayFormat+".txt").equals(pathname.getName()))
                            {
                                return true;
                            }
                            else
                            {
                                return false;
                            }
                        }
                    });
                    
                    if(null != matchFile)
                    {
                        for (File file : matchFile)
                        {
                            //找到符合要求的文件,开始解析
                            ImportUserFromFileTask task = (ImportUserFromFileTask) SpringUtils.getApplicationContext().getBean("importUserFromFileTask");
                            task.setFileName(file.getAbsolutePath());
                            task.start();
                        }
                    }
                }
    
           //注册监控服务,监控新增事件
                WatchKey key = path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
           while (true) {  
                    key = watchService.take();  
                    for (WatchEvent<?> event : key.pollEvents()) { 
                        
                        //获取目录下新增的文件名
                        String fileName = event.context().toString();
                        
                        //检查文件名是否符合要求
                        if((todayFormat+".txt").equals(fileName))
                        {
                            String filePath = path.toFile().getAbsolutePath()+File.separator+fileName;
                            log.info("import filePath:"+filePath);
                            
                            //启动线程导入用户数据
                            ImportUserFromFileTask task = (ImportUserFromFileTask) SpringUtils.getApplicationContext().getBean("importUserFromFileTask");//new ImportUserFromFileTask(filePath);
                            task.setFileName(filePath);
                            task.start();
                            log.debug("启动线程导入用户数据"+task);
                        }
                    }  
             key.reset();  
                }  
            } catch (IOException e)
            {
                log.error(e.getMessage(),e);
            } catch (InterruptedException e)
            {
                log.error(e.getMessage(),e);
            }
        }
    }
    View Code

    4、创建解析用户文件及导入数据库线程,由WatchFilePathTask启动

    package com.zealer.ad.task;
    
    import com.zealer.ad.entity.AutoPutUser;
    import com.zealer.ad.entity.Bmsuser;
    import com.zealer.ad.service.AutoPutUserService;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    import org.joda.time.DateTime;
    
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.InputStreamReader;
    
    import java.util.Date;
    
    import javax.annotation.Resource;
    
    
    /**
     * 解析用户文件及入库线程,由WatchFilePathTask启动
     * @author cancer
     *
     */
    public class ImportUserFromFileTask extends Thread {
        private Log log = LogFactory.getLog(ImportUserFromFileTask.class);
        private String fileName;
        @Resource(name = "autoPutUserService")
        private AutoPutUserService autoPutUserService;
    
        @Override
        public void run() {
            File file = new File(fileName);
    
            if (file.exists() && file.isFile()) {
                log.debug(":@@@准备开始休眠10秒钟:" + file);
    
                //休眠十分钟,防止文件过大还没完全拷贝到指定目录下,这里的线程就开始读取文件
                try {
                    sleep(10000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
    
                InputStreamReader read;
    
                try {
                    read = new InputStreamReader(new FileInputStream(file), "UTF-8");
    
                    BufferedReader bufferedReader = new BufferedReader(read);
                    String lineTxt = null;
                    int count = 0;
                    Boolean f = false;
    
                    while ((lineTxt = bufferedReader.readLine()) != null) {
                        if ((null == lineTxt) || "".equals(lineTxt)) {
                            continue;
                        }
    
                        if (lineTxt.startsWith("'")) {
                            lineTxt = lineTxt.substring(1, lineTxt.length());
                        }
    
                        //解析分隔符为', '
                        String[] lines = lineTxt.split("', '");
                        int length = lines.length;
    
                        if (length < 2) {
                            continue;
                        }
    
                        Bmsuser bmsuser = new Bmsuser();
                        bmsuser.setName(lines[0]);if (!"".equals(lines[1])) {
                            bmsuser.setCity(lines[1]);
                        }
                //根据唯一索引已经存在的数据则不插入
                        f = autoPutUserService.insertIgnore(bmsuser);
    
                        if (f) {
                            count++;
                        }
                    }
    
                    //汇总数据
                    AutoPutUser autoPutUser = new AutoPutUser();
                    autoPutUser.setTotalCount(autoPutUserService.getUserCount());
                    autoPutUser.setCount(count);
                    autoPutUser.setCountDate(new Date(System.currentTimeMillis()));
    
                    String today = DateTime.now().toString("yyyy-MM-dd");
                    Integer oldCount = autoPutUserService.getOldCount(today);
    
                    //如果今天导入过了就更新否则插入
                    if (!oldCount.equals(0)) {
                        autoPutUserService.updateUserData(autoPutUser, today,
                            oldCount);
                    } else {
                        autoPutUserService.gatherUserData(autoPutUser);
                    }
    
                    //注意:要关闭流
                    read.close();
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }
    
                File newFile = new File(file.getPath() +
                        System.currentTimeMillis() + ".complate");
                file.renameTo(newFile);
            } else {
                log.error(fileName + " file is not exists");
            }
        }
    
        public String getFileName() {
            return fileName;
        }
    
        public void setFileName(String fileName) {
            this.fileName = fileName;
        }
    
        public AutoPutUserService getAutoPutUserService() {
            return autoPutUserService;
        }
    
        public void setAutoPutUserService(AutoPutUserService autoPutUserService) {
            this.autoPutUserService = autoPutUserService;
        }
    }
    View Code

    附带:

    1、sql脚本

    CREATE TABLE `bmsuser` (
      `id` int(255) unsigned NOT NULL AUTO_INCREMENT,
      `name` varchar(32) DEFAULT NULL ,
      `city` varchar(32) DEFAULT NULL COMMENT ,
      PRIMARY KEY (`bmsid`),
      UNIQUE KEY `bbLoginName` (`bbLoginName`)
    ) ENGINE=InnoDB  DEFAULT CHARSET=utf8;

    2、文件格式,命名为yyyyMMdd.txt

    '张三', '深圳'

    转载自:http://www.cnblogs.com/zishengY/p/6958564.html

  • 相关阅读:
    逻辑最复杂的MVVM模式实现
    剧本:博客园之天外飞仙
    本博客开始偏转方向,开始研究UDP在WCF下的实现
    Prism研究 目录
    Q & A category in Prism forums, with some answers and samples of mine.
    我眼中的SOA,以及在实际项目中的应用经验
    数据结构 C#描述 第三章 (更新)
    数据结构 C#描述 第四章
    数据结构 C#描述 第七章 (第一部分)
    数据结构 C#描述
  • 原文地址:https://www.cnblogs.com/blovej/p/7065841.html
Copyright © 2011-2022 走看看