zoukankan      html  css  js  c++  java
  • Flume-ng-1.4.0 spooling source的方式增加了对目录的递归检测的支持


     因为flume的spooldir不支持子目录文件的递归检测,并且业务需要,所以修改了源码,重新编译


    代码修改参考自:http://blog.csdn.net/yangbutao/article/details/8835563

    不过在1.4中已经不是修改SpoolingFileLineReader类了,而是apache-flume-1.4.0-srcflume-ng-coresrcmainjavaorgapacheflumeclientavroReliableSpoolingFileEventReader.java

    并且变量directory应该改为spoolDirectory


     1     /*
     2      * @author admln
     3      * 
     4      * @date 2015年4月8日 上午9:37:20
     5      */
     6     private void listDirFiles(List<File> files, File dir, FileFilter filter) {
     7         File[] childs = dir.listFiles(filter);
     8         for (int i = 0; i < childs.length; i++) {
     9             if (childs[i].isFile()) {
    10                 files.add(childs[i]);
    11             } else {
    12                 if (childs[i].isDirectory()) {
    13                     listDirFiles(files, childs[i], filter);
    14                 }
    15             }
    16         }
    17     }
    18 
    19     /**
    20      * Find and open the oldest file in the chosen directory. If two or more
    21      * files are equally old, the file name with lower lexicographical value is
    22      * returned. If the directory is empty, this will return an absent option.
    23      */
    24     private Optional<FileInfo> getNextFile() {
    25         /* Filter to exclude finished or hidden files */
    26         FileFilter filter = new FileFilter() {
    27          public boolean accept(File pathName) {
    28           if ((pathName.getName().endsWith(completedSuffix))
    29             || (pathName.getName().startsWith("."))) {
    30             return false;
    31           }
    32           return true;
    33         }
    34      };
    35      // List<File> candidateFiles = Arrays.asList(spoolDirectory.listFiles(filter));
    36      List<File> candidateFiles = new ArrayList<File>();
    37      listDirFiles(candidateFiles, spoolDirectory, filter);

    很多没必要的版本就不改,如果少包或者版本不对,即使编译通过了,使用的时候也会报java.lang.Error: Unresolved compilation problem,就要修改重新编译


    重新编译的时候可以参考:http://www.iteblog.com/archives/1032
    编译命令:

    mvn install -Phadoop-2 -DskipTests -Dtar

    会下很多各种jar包,很浪费时间


    已经编译好的:http://pan.baidu.com/s/1eQxUDxC    5x9l


    还有个比较简单的一点的办法,因为改的flume-ng-core里面的代码,而flume-ng-core是flume-ng的一个子项目,所以直接在flume-ng-core里面执行单独编译,得到target/flume-ng-core-1.4.0.jar,然后替换现有flume-bin/lib/里面的flume-ng-core-1.4.0.jar,也可以实现想要的功能。这个编译基本上不会有什么问题。

    现成的:http://pan.baidu.com/s/1CVR3K  989v


    在1.5.0中的doc说已经支持子目录的递归读取了但是只有patch,没集成到代码中,也没有kafka sink,所以个人觉得干脆改CDH的flume算了,加上递归,本身有kafka sink

    http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.2.5.tar.gz


  • 相关阅读:
    WM_MOUSEWHEEL消息的处理
    Delphi之TStrings和TStringLists类[转]
    使用mysqladmin命令修改Mysql密码
    AP_发票与预付款核销时预付款带税码
    ORACLE EBS AP发票到付款的数据流
    .关于货币大写的探索
    Oracle SQL 空值排序(Nulls)
    实现主从关系Form中汇总行金额/数量
    巧妙的查看FORM中的 LOV查询语句
    供应商接口的使用
  • 原文地址:https://www.cnblogs.com/admln/p/flume-1-4-spooldir-childdrectory.html
Copyright © 2011-2022 走看看