zoukankan      html  css  js  c++  java
  • 【Java】【Fulme】Flume-NG源代码阅读之SpoolDirectorySource

            org.apache.flume.source.SpoolDirectorySource是flume的一个经常使用的source,这个源支持从磁盘中某目录获取文件数据。不同于其它异步源,这个源可以避免重新启动或者发送失败后数据丢失。flume可以监控目录,当出现新文件时会读取该文件并获取数据。当一个给定的文件被所有读入到通道中时,该文件会被重命名以标志已经完毕。同一时候,该源须要一个清理进程来定期移除完毕的文件。

      通道可选地将一个完毕路径的原始文件插入到每一个事件的hearder域中。在读取文件时,source缓存文件数据到内存中。同一时候,须要确定设置了bufferMaxLineLength选项,以确保该数据远大于输入数据中数据最长的某一行。

         注意!!!channel仅仅接收spooling directory中唯一命名的文件。假设文件名称反复或文件在读取过程中被改动,则会有读取失败返回异常信息。这样的场景下,同名的文件拷贝到这个文件夹时建议带唯一标示,比方时间戳。

         一、configure(Context context)方法。代码例如以下:

    public void configure(Context context) {
        spoolDirectory = context.getString(SPOOL_DIRECTORY);
        Preconditions.checkState(spoolDirectory != null,
            "Configuration must specify a spooling directory");
    
        completedSuffix = context.getString(SPOOLED_FILE_SUFFIX,
            DEFAULT_SPOOLED_FILE_SUFFIX);
        deletePolicy = context.getString(DELETE_POLICY, DEFAULT_DELETE_POLICY);
        fileHeader = context.getBoolean(FILENAME_HEADER,
            DEFAULT_FILE_HEADER);
        fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
            DEFAULT_FILENAME_HEADER_KEY);
        batchSize = context.getInteger(BATCH_SIZE,
            DEFAULT_BATCH_SIZE);
        inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);
    
        ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT);
        trackerDirPath = context.getString(TRACKER_DIR, DEFAULT_TRACKER_DIR);
    
        deserializerType = context.getString(DESERIALIZER, DEFAULT_DESERIALIZER);
        deserializerContext = new Context(context.getSubProperties(DESERIALIZER +
            "."));
    
        // "Hack" to support backwards compatibility with previous generation of
        // spooling directory source, which did not support deserializers
        Integer bufferMaxLineLength = context.getInteger(BUFFER_MAX_LINE_LENGTH);
        if (bufferMaxLineLength != null && deserializerType != null &&
            deserializerType.equals(DEFAULT_DESERIALIZER)) {
          deserializerContext.put(LineDeserializer.MAXLINE_KEY,
              bufferMaxLineLength.toString());
        }
    
      }

    1、spoolDirectory是监控文件夹,不能为空,没有默认值。这个source不具有监控子文件夹的功能,也就是不能递归监控。假设须要,这须要自己去实现,http://blog.csdn.net/yangbutao/article/details/8835563 这里有递归检測的实现;

      2、completedSuffix是文件读取完毕后给完毕文件加入的标记后缀,默认是".COMPLETED";

      3、deletePolicy这是是否删除读取完成的文件,默认是"never",就是不删除,眼下仅仅支持"never"和“IMMEDIATE”;

      4、fileHeader是否在event的Header中加入文件名称,boolean类型

      5、fileHeaderKey这是event的Header中的key,value是文件名称

      6、batchSize这个是一次处理的记录数,默认是100;

      7、inputCharset编码方式,默认是"UTF-8";

      8、ignorePattern忽略符合条件的文件名称

      9、trackerDirPath被处理文件元数据的存储文件夹,默认".flumespool"

      10、deserializerType将文件里的数据序列化成event的方式,默认是“LINE”---org.apache.flume.serialization.LineDeserializer

      11、deserializerContext这个主要用在Deserializer中设置编码方式outputCharset和文件每行最大长度maxLineLength。

      

      二、start()方法。代码例如以下:

    public void start() {
        logger.info("SpoolDirectorySource source starting with directory: {}",
            spoolDirectory);
    
        ScheduledExecutorService executor =
            Executors.newSingleThreadScheduledExecutor();
        counterGroup = new CounterGroup();
    
        File directory = new File(spoolDirectory);
        try {
          reader = new ReliableSpoolingFileEventReader.Builder()
              .spoolDirectory(directory)
              .completedSuffix(completedSuffix)
              .ignorePattern(ignorePattern)
              .trackerDirPath(trackerDirPath)
              .annotateFileName(fileHeader)
              .fileNameHeader(fileHeaderKey)
              .deserializerType(deserializerType)
              .deserializerContext(deserializerContext)
              .deletePolicy(deletePolicy)
              .inputCharset(inputCharset)
              .build();
        } catch (IOException ioe) {
          throw new FlumeException("Error instantiating spooling event parser",
              ioe);
        }
    
        Runnable runner = new SpoolDirectoryRunnable(reader, counterGroup);
        executor.scheduleWithFixedDelay(
            runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);
    
        super.start();
        logger.debug("SpoolDirectorySource source started");
      }

     1、构建了一个org.apache.flume.client.avro.ReliableSpoolingFileEventReader的对象reader;

      2、启动了一个每隔POLL_DELAY_MS(默认500,单位ms)运行一次SpoolDirectoryRunnable的进程;

      三、读取并发送event进程。代码例如以下:

    private class SpoolDirectoryRunnable implements Runnable {
        private ReliableSpoolingFileEventReader reader;
        private CounterGroup counterGroup;
    
        public SpoolDirectoryRunnable(ReliableSpoolingFileEventReader reader,
            CounterGroup counterGroup) {
          this.reader = reader;
          this.counterGroup = counterGroup;
        }
    
        @Override
        public void run() {
          try {
            while (true) {
              List<Event> events = reader.readEvents(batchSize);  //读取batchSize个记录
              if (events.isEmpty()) {
                break;
              }
              counterGroup.addAndGet("spooler.events.read", (long) events.size());
    
              getChannelProcessor().processEventBatch(events);  //将events批量发送到channel
              reader.commit();
            }
          } catch (Throwable t) {
            logger.error("Uncaught exception in Runnable", t);
            if (t instanceof Error) {
              throw (Error) t;
            }
          }
        }
      }

      该进程实现了批量读取reader所指向的文件的数据,并发送到channel。

    四、org.apache.flume.client.avro.ReliableSpoolingFileEventReader的构造方法首先是先尝试对spoolDirectory是否有创建文件、读、写、删除等权限;然后在构造"$spoolDirectory/.flumespool/.flumespool-main.meta"元数据文件

    五、上面SpoolDirectoryRunnable.run方法中的List<Event> events = reader.readEvents(batchSize),是org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(batchSize):

     public List<Event> readEvents(int numEvents) throws IOException {
        if (!committed) {
          if (!currentFile.isPresent()) {//为空,假设Optional包括非null的引用(引用存在),返回true
            throw new IllegalStateException("File should not roll when " +
                "commit is outstanding.");
          }
          logger.info("Last read was never committed - resetting mark position.");
          currentFile.get().getDeserializer().reset();
        } else {//已经committed成功
          // Check if new files have arrived since last call
          //Returns true if this holder contains a (non-null) instance
          if (!currentFile.isPresent()) {//为空,获取下一个文件,初次调用
            currentFile = getNextFile();
          }
          // Return empty list if no new files
          if (!currentFile.isPresent()) {//为空,已经没有可读的文件了
            return Collections.emptyList();
          }
        //其他的说明是currentFile眼下还在读
        }
    
        EventDeserializer des = currentFile.get().getDeserializer();
        List<Event> events = des.readEvents(numEvents);//加入event的body
    
        /* It's possible that the last read took us just up to a file boundary.
         * If so, try to roll to the next file, if there is one. */
        if (events.isEmpty()) {
          retireCurrentFile();  //改名字
          currentFile = getNextFile();//换下一个文件
          if (!currentFile.isPresent()) {
            return Collections.emptyList();
          }
          events = currentFile.get().getDeserializer().readEvents(numEvents);//继续读,加入event的body
        }
    
        if (annotateFileName) {
          String filename = currentFile.get().getFile().getAbsolutePath();
          for (Event event : events) {
            event.getHeaders().put(fileNameHeader, filename);//加入header
          }
        }
    
        committed = false;
        lastFileRead = currentFile;
        return events;
      }

    1,committed初始化时是true,所以第一次执行就是通过getNextFile()获取当前要去读的文件。假设是空就返回空值了。

    2,使用deserializer(默认是org.apache.flume.serialization.LineDeserializer)的readEvents(numEvents)去批量读数据封装成event。

    3,如获取的批量events为空,说明这个文件读完了,须要对这个读完的文件做个“删除”(retireCurrentFile()方法,在这也会删除元数据文件),就是依据deletePolicy(删除还是加入去读完成后缀completedSuffix);可是这个本方法是有返回值的就是events,所以须要获取下一个文件,即再次执行getNextFile(),并events = currentFile.get().getDeserializer().readEvents(numEvents)

    4,是否要对这些events的Header中加入文件名称

    5,committed = false;    lastFileRead = currentFile; 并返回events。

    这种方法还有几点须要解释:

    其一、就是committed參数,此參数关系到这一批量的event是否已经正确处理完成。能够看到上面的5中所讲,每调用一次ReliableSpoolingFileEventReader.readEvents(batchSize)均会在最后将committed设置为false,可是在SpoolDirectoryRunnable.run()方法中也能够看出在调用readEvents方法后还会调用ReliableSpoolingFileEventReader.commit()方法,代码例如以下:

    /** Commit the last lines which were read. */
      @Override
      public void commit() throws IOException {
        if (!committed && currentFile.isPresent()) {
          currentFile.get().getDeserializer().mark();
          committed = true;
        }
      }

    这种方法说明满足两个条件就能够:一、向trackerFile写入读到的记录位置,mark()方法会将syncPosition写入trackerFile,而ResettableFileInputStream中的position用来暂存位置添加的,待到何时会syncPosition=position,这样是为了防止出现异常时用于恢复丢失的数据;二、将committed  = true。两个条件:一个是committed=false,这个运行完readEvents最后会置为false;二、currentFile“非空”,代表有正在读的文件。假设committed在readEvents中開始时为false,说明:一、event提交到channel时出现了问题,没有运行reader.commit;二、currentFile已经“为空”,说明没有能够读的文件。这两点也体如今readEvents開始部分,committed=false时,假设没有可读文件就会抛出异常File should not roll when commit is outstanding.";假设是在提交到channel时出问题会通过currentFile.get().getDeserializer().reset()又一次撤回到上次正确提交channel的位置,这样能够使得不丢失数据。

    其二、就是getNextFile()方法。这种方法会首先过滤检測文件夹的子文件夹(也就是不能递归)、隐藏文件(以"."开头的文件)、已经读完的文件(有completedSuffix后缀的)、符合ignorePattern的文件;然后将过滤后的文件按时间的先后顺序排序,再创建一个新的相应的元数据文件;构造一个读取文件的输入流ResettableFileInputStream,并将此输入流作为參数传递给deserializer,终于返回一个Optional.of(new FileInfo(nextFile, deserializer));

    其三、就是LineDeserializer)的readEvents(numEvents)方法。这种方法会多次(numEvents)调用LineDeserializer(默认)的readLine()获取一行数据封装成event。readLine()会通过org.apache.flume.serialization.ResettableFileInputStream.readChar()不断的去获取数据,读完正行后推断每行的长度是否超过规定值maxLineLength。readChar()方法除了不断读取一个字符外,还会记下字符的位置,等待将位置写入元数据文件里(通过deserializer.mark()写入)



  • 相关阅读:
    ARM 平台下的 SSHD 配置
    IISExpress 开放局域网访问
    qt5 交叉编译
    QT4 交叉编译
    make install 时指定安装路径
    linux 读取物理寄存器
    Windows 7 64bit Python 2 Install
    用eggjs返回xml格式数据,前端解析xml
    vue-cli项目中axios的配置文件
    webpack配置练习typescript的web项目
  • 原文地址:https://www.cnblogs.com/lcchuguo/p/4478568.html
Copyright © 2011-2022 走看看