zoukankan      html  css  js  c++  java
  • flumengtaildirectorysource 修改调试可用

    由于flume-ng至1.5版本仍旧没有稳定可用的类似flume-og中的taildir的功能,所以从git中https://github.com/jinoos/flume-ng-extends找了一个别人针对flume-ng实现的的taildir这个按照github上他自己说明,是没法正常使用的。查看了源码后,做了一些相应修改

    1. 默认的DirectoryTailParserModulable类修改

    他实现了2种DirectoryTailParserModulable

      第一种是SingleLineParserModule,适用日志里只有单条记录的。并且代码中默认就是使用的这个,显然很不靠谱。提供了配置项,但是说明里没有写出来,配置项为 ‘parser’.

       第二种是MultiLineParserModule,适用多行的日志文件的。这里我们大部分情况肯定是要用这个的。

    DirectoryTailSource类中如下行

      private static final String DEFAULT_PARSER_MODULE_CLASS = "com.jinoos.flume.SingleLineParserModule";

    修改为。包名根据实际情况来更改

      private static final String DEFAULT_PARSER_MODULE_CLASS = "org.apache.flume.source.taildirectory.MultiLineParserModule";

    2. first-line-pattern配置

    这是MultiLineParserModule中的一个属性,用来验证读进来的行是否为第一行。这个说明中也没提到

    如果没有配置这个配置,那么就无法正常执行,会报“wrong log format”。

    主要代码如下:

     1         private void readMessage(FileSet fileSet) {
     2             try {
     3                 String buffer;
     4 
     5                 synchronized (fileSet) {
     6 
     7                     while ((buffer = fileSet.readLine()) != null) {
     8                         if (buffer.length() == 0) {
     9                             continue;
    10                         }
    11 
    12                         boolean isFirstLine = parserModule.isFirstLine(buffer);
    13                         if (isFirstLine) {
    14                             sendEvent(fileSet);
    15                             fileSet.appendLine(buffer);
    16                             parserModule.parse(buffer, fileSet);
    17 
    18                         } else {
    19                             if (fileSet.getLineSize() == 0) {
    20                                 logger.debug("Wrong log format, " + buffer);
    21                                 continue;
    22                             } else {
    23                                 fileSet.appendLine(buffer);
    24                                 parserModule.parse(buffer, fileSet);
    25                             }
    26                         }
    27 
    28                         if (parserModule.isLastLine(buffer)) {
    29                             sendEvent(fileSet);
    30                         }
    31                     }
    32                 }
    33             } catch (IOException e) {
    34                 logger.warn(e.getMessage(), e);
    35             }
    36         }

    根据我们的实际需求,我们不需要判断是否第一行,只要有change事件,全部写入到channel中即可

    修改为如下方式

     1         // 파일을 읽고 Event를 생성한다.
     2         private void readMessage(FileSet fileSet) {
     3             try {
     4                 String buffer;
     5 
     6                 synchronized (fileSet) {
     7 
     8                     while ((buffer = fileSet.readLine()) != null) {
     9                         if (buffer.length() == 0) {
    10                             continue;
    11                         }
    12 
    13                         fileSet.appendLine(buffer);
    14                         sendEvent(fileSet);
    15                     }
    16                 }
    17             } catch (IOException e) {
    18                 logger.warn(e.getMessage(), e);
    19             }
    20         }

    改为这种方式后,只要来一行就会send到channel中。如果需要批量的,可以按自己要求更改。

    现在就不再需要关注first-line-pattern这个配置了。

    注意:但是配置在配置文件中还是配的,虽然它没有起到任何作用。如果想不配置,请修改MultiLineParserModule的configure(Context context)方法

    3.监控文件中有中文,编码的配置添加

     目前这个版本是无法支持中文的文件的。

    正式读取数据的方法:位置FileSet类中

      public String readLine() throws IOException {
        return rReader.readLine();
      }

    这个rReader是个RandomAccessFile对象

      public FileSet(AbstractSource source, FileObject fileObject)
          throws IOException {
        this.source = source;
        this.fileObject = fileObject;
    
        this.bufferList = new ArrayList<String>();
    
        //File f = new File(fileObject.getName().getPath());
        File f = new File("d:/tmp/log_compare/test1.txt");
    
        rReader = new RandomAccessFile(f, "r");
        rReader.seek(f.length());
    
        bufferList = new ArrayList<String>();
        headers = new HashMap<String, String>();
        logger.debug("FileSet has been created " + fileObject.getName().getPath());
        this.seq = 0L;
      }

    在FileSet类实例化时创建。

    下面开始修改操作,源代码中是直接使用了RandomAccessFile的readline()方法,修改为按byte读取的方式

      /**
       * 
       * @Title: readLine
       * @Description: TODO(读取文件中的一行)
       * @param @throws IOException    设定文件
       * @return String    返回类型
       * @throws
       */
      public String readLine() throws IOException {
        if(rReader.getFilePointer() < rReader.length()) {
            byte b = rReader.readByte();//读取一个byte
            int i = 0;
            byte[] buf = new byte[10240];//创建大小为1M的数据,如果你的单行超过1M,那么会出错
            //如果读到换行符,或者读到文件最后就停止。表示已经读完一行
            while(b != '\n' && rReader.getFilePointer() < rReader.length()) {
                buf[i++] = b;
                b = rReader.readByte();
                
            }
            return new String(buf,0,i);
        }else{
            return "";
        }
      }

    改完后重新打包再次测试,发现已经可以支持中文了。

    4.每次新文件刚被创建时会丢失第一条数据

    代码如下

            public void run() {
                while (true) {
                    try {
                        // DirectoryTailEvent event = eventQueue.poll(
                        // eventQueueWorkerTimeoutMiliSecond,
                        // TimeUnit.MILLISECONDS);
                        DirectoryTailEvent event = eventQueue.take();
    
                        if (event == null) {
                            continue;
                        }
    
                        if (event.type == FileEventType.FILE_CHANGED) {
                            fileChanged(event.event);
                        } else if (event.type == FileEventType.FILE_CREATED) {
                            fileCreated(event.event);
                        } else if (event.type == FileEventType.FILE_DELETED) {
                            fileDeleted(event.event);
                        } else if (event.type == FileEventType.FLUSH) {
                            if (event.fileSet != null)
                                sendEvent(event.fileSet);
                        }
                    } catch (InterruptedException e) {
                        logger.debug(e.getMessage(), e);
                    } catch (FileSystemException e) {
                        logger.info(e.getMessage(), e);
                    }
                }
            }

    上面这段代码为监测的文件夹有新的事件时的处理。这里我们要看的是FILE_CREATE事件,他调用了fileCreated(event.event);

     1     private void fileCreated(FileChangeEvent event)
     2                 throws FileSystemException {
     3             String path = event.getFile().getName().getPath();
     4             String dirPath = event.getFile().getParent().getName().getPath();
     5 
     6             logger.debug(path + " has been created.");
     7 
     8             DirPattern dirPattern = null;
     9             dirPattern = pathMap.get(dirPath);
    10 
    11             if (dirPattern == null) {
    12                 logger.warn("Occurred create event from un-indexed directory. "
    13                         + dirPath);
    14                 return;
    15             }
    16 
    17             // 파일명이 대상인지 검사한다.
    18             if (!isInFilePattern(event.getFile(), dirPattern.getFilePattern())) {
    19                 logger.debug(path + " is not in file pattern.");
    20                 return;
    21             }
    22 
    23             FileSet fileSet;
    24 
    25             fileSet = fileSetMap.get(event.getFile().getName().getPath());
    26             //fileSet = fileSetMap.get(path);
    27             if (fileSet == null) {
    28                 try {
    29                     logger.info(path
    30                             + " is not in monitoring list. It's going to be listed.");
    31                     
    32                     fileSet = new FileSet(source, event.getFile());
    33                     // a little synchronized bug here.fixed by tqli,2014-08-07
    34                     // ,E-mail:tiangang1126@126.com
    35                     synchronized (fileSetMap) {
    36                         fileSetMap.put(path, fileSet);
    37                     }
    38                 } catch (IOException e) {
    39                     logger.error(e.getMessage(), e);
    40                     return;
    41                 }
    42             }
    43         }

    看第27行,当新的文件进来,需要创建一个fileSet对象。将这个fileSet对象存入fileSetMap中

    看fileSet实例化的方法,上面已经贴过了

     1   public FileSet(AbstractSource source, FileObject fileObject)
     2       throws IOException {
     3     this.source = source;
     4     this.fileObject = fileObject;
     5 
     6     this.bufferList = new ArrayList<String>();
     7 
     8     File f = new File(fileObject.getName().getPath());
     9     //File f = new File("d:/tmp/log_compare/test1.txt");
    10     rReader = new RandomAccessFile(f, "r");
    11     rReader.seek(f.length());
    12     bufferList = new ArrayList<String>();
    13     headers = new HashMap<String, String>();
    14     logger.debug("FileSet has been created " + fileObject.getName().getPath());
    15     logger.debug("file length now is : " + f.length());
    16     this.seq = 0L;
    17   }

    注意看第11行,将游标移到到f.length的位置,这样的问题就是跟着文件新建时写入的内容,全部被忽略了。这样就造成了数据丢失

    那怎么解决这个问题呢,简单的改为

    rReader.seek(0);
    肯定是不行的,具体的原因,大家自己思考下吧。

    我们目的的就是在有监控新的事件时,创建的fileSet,游标位置能在文件原来的的位置。
    需求明确了,下面就知道该做哪些事了。
    1 首先在DirectoryTailSource中start方法执行时,将配置监控文件下符合正则条件文件的length都保存在一个Map里
    2 在监听到新事件新建fileSet时,判断这个文件是新建的还是之前就存在的,如果是之前就存在的,那么就可以直接取之前记下的这个文件的大小。如果不存在,说明这个文件是个新文件,则从0位置开始读
    注意:这个不支持文件更改的情况,只能适应只对文件做增加的场景
    下面是代码修改的部分

    DirectoryTailSource类
    添加 fileInitLengthMap 属性
    1     private Map<String, DirPattern> dirMap;
    2     private Map<String, DirPattern> pathMap;
    3     private Map<String,Long> fileInitLengthMap;//文件初始大小记录,用来定位新建fileSet时的游标初始位置
    在configure方法中实例化fileInitLengthMap
        public void configure(Context context) {
            logger.info("Source Configuring..");
    
            dirMap = new HashMap<String, DirPattern>();
            pathMap = new HashMap<String, DirPattern>();
            fileInitLengthMap = new HashMap<String,Long>();

    在start方法中初始化fileInitLengthMap。保存全部符合正则条件的文件大小。红色部分为添加的代码

     1     public void start() {
     2         logger.info("Source Starting..");
     3 
     4         if (sourceCounter == null) {
     5             sourceCounter = new SourceCounter(getName());
     6         }
     7 
     8         fileSetMap = new Hashtable<String, FileSet>();
     9 
    10         try {
    11             fsManager = VFS.getManager();
    12         } catch (FileSystemException e) {
    13             logger.error(e.getMessage(), e);
    14             return;
    15         }
    16 
    17         monitorRunnable = new MonitorRunnable();
    18 
    19         fileMonitor = new DefaultFileMonitor(monitorRunnable);
    20         fileMonitor.setRecursive(false);
    21 
    22         FileObject fileObject;
    23 
    24         logger.debug("Dirlist count " + dirMap.size());
    25         for (Entry<String, DirPattern> entry : dirMap.entrySet()) {
    26             logger.debug("Scan dir " + entry.getKey());
    27 
    28             DirPattern dirPattern = entry.getValue();
    29 
    30             try {
    31                 fileObject = fsManager.resolveFile(dirPattern.getPath());
    32             } catch (FileSystemException e) {
    33                 logger.error(e.getMessage(), e);
    34                 continue;
    35             }
    36 
    37             try {
    38                 if (!fileObject.isReadable()) {
    39                     logger.warn("No have readable permission, "
    40                             + fileObject.getURL());
    41                     continue;
    42                 }
    43 
    44                 if (FileType.FOLDER != fileObject.getType()) {
    45                     logger.warn("Not a directory, " + fileObject.getURL());
    46                     continue;
    47                 }
    48 
    49                 // 폴더를 Monitoring 대상에 추가한다.
    50                 fileMonitor.addFile(fileObject);
    51                 logger.debug(fileObject.getName().getPath()
    52                         + " directory has been add in monitoring list");
    53                 pathMap.put(fileObject.getName().getPath(), entry.getValue());
    54                 //pathMap.put("d:/tmp/log_compare", entry.getValue());
    55                 //新增部分,文件初始化大小保存
    56                 FileObject[] allChiledfile = fileObject.getChildren();
    57                 for(FileObject chiledFileobject : allChiledfile) {
    58                     if(dirPattern.getFilePattern().matcher(chiledFileobject.getName().getBaseName()).find()) {
    59                         String chiledFildPath = chiledFileobject.getName().getPath();
    60                         //String chiledFildPath = "d:/tmp/log_compare/test1.txt";
    61                         File chiledfile = new File(chiledFildPath);
    62                         fileInitLengthMap.put(chiledFildPath, 
    63                                 chiledfile.length());
    64                         logger.debug(chiledFildPath + " init length is :" + chiledfile.length());
    65                     }
    66                 }
    67             } catch (FileSystemException e) {
    68                 logger.warn(e.getMessage(), e);
    69                 continue;
    70             } catch (Exception e) {
    71                 logger.debug(e.getMessage(), e);
    72             }
    73 
    74         }
    75 
    76         executorService = Executors
    77                 .newFixedThreadPool(eventQueueWorkerSize + 1);
    78         monitorFuture = executorService.submit(monitorRunnable);
    79 
    80         for (int i = 0; i < eventQueueWorkerSize; i++) {
    81             workerFuture[i] = executorService.submit(new WorkerRunnable(this));
    82         }
    83 
    84         sourceCounter.start();
    85         super.start();
    86     }

    FileSet类

     1   public FileSet(AbstractSource source, FileObject fileObject,Map<String,Long> fileInitLengthMap)
     2       throws IOException {
     3     this.source = source;
     4     this.fileObject = fileObject;
     5 
     6     this.bufferList = new ArrayList<String>();
     7 
     8     File f = new File(fileObject.getName().getPath());
     9     rReader = new RandomAccessFile(f, "r");
    10     /*
    11      *判断在初始化taildirSource时,这个文件是否存在,如果存在则游标定位当时记录下的文件长度开始
    12      *如果不存在,则说明这是一个新建的文件,游标从0开始
    13      */
    14     if(fileInitLengthMap.containsKey(fileObject.getName().getPath())) {
    15         rReader.seek(fileInitLengthMap.get(fileObject.getName().getPath()));
    16     }else{
    17         rReader.seek(0);
    18     }
    19     
    20     bufferList = new ArrayList<String>();
    21     headers = new HashMap<String, String>();
    22     logger.debug("FileSet has been created " + fileObject.getName().getPath());
    23     logger.debug("file length now is : " + f.length());
    24     this.seq = 0L;
    25   }

    修改类实例化的方法。并修改DirectoryTailSource类中调用FileSet实例化方法的地方。

    至此修改全部全部完成。

    没找到能上传附件的地方,改完的jar包就不提供了。

    此为一个使用这个jar的例子

    a.sources = sources
    a.sinks = sinks
    a.channels = c
    
    #configure sources
    a.sources.sources.type = org.apache.flume.source.taildirectory.DirectoryTailSource
    a.sources.sources.dirs = s0
    #a.sources.sources.dirs.s0.path = /usr/local/nginx/logs/
    a.sources.sources.dirs.s0.path = /home/flume/testTailDir
    a.sources.sources.dirs.s0.file-pattern = ^access_.*log$
    a.sources.sources.first-line-pattern = ^(.*)$
    #congfigure sinks
    a.sinks.sinks.type = file_roll
    a.sinks.sinks.sink.directory = /home/flume/testTailDir2
    a.sinks.sinks.sink.rollInterval = 30
    a.sinks.sinks.channel = c
    #configure channals
    a.channels.c.type = memory
    #bind channel
    a.sources.sources.channels = c
  • 相关阅读:
    设计模式 ( 十七) 状态模式State(对象行为型)
    Intellij13 IDEA常用快捷键 (mac 10.5 +),优化,使用出现的问题汇总
    Web服务器及Web应用服务器
    阮一峰的网络日志
    双击退出的实现
    完成3DM以后的总结(2).Xutils的简单使用
    完成3DM以后的总结(1).PullToRefresh
    软考之路之j2se总结
    2013-2014年终总结
    牛腩新闻发布系统之获取IP
  • 原文地址:https://www.cnblogs.com/pingjie/p/4180788.html
Copyright © 2011-2022 走看看