zoukankan      html  css  js  c++  java
  • Flume Spool Source 源码过程分析(未运行)

    主要涉及到的类:

    SpoolDirectorySource 读取用户配置,并按照batchSize去读取这么多量的Event从用户指定的Spooling Dir中。SpoolDirectorySource 不会去读取某一个具体的文件,而是通过内部的reader去读取。文件切换等操作,都是reader去实现
    内部类:SpoolDirectoryRunnable是一个线程,其中的run方法,完成从Spooling Dir读取Event(使用reader去读取)
     
     1 @Override
     2     public void run() {
     3       int backoffInterval = 250;
     4       try {
     5         while (!Thread.interrupted()) {
     6           List<Event> events = reader.readEvents(batchSize);
     7           if (events.isEmpty()) {
     8             break;
     9           }
    10           sourceCounter.addToEventReceivedCount(events.size());
    11           sourceCounter.incrementAppendBatchReceivedCount();
    12 
    13           try {
    14             getChannelProcessor().processEventBatch(events);
    15             reader.commit();
    16           } catch (ChannelException ex) {
    17             logger.warn("The channel is full, and cannot write data now. The " +
    18               "source will try again after " + String.valueOf(backoffInterval) +
    19               " milliseconds");
    20             hitChannelException = true;
    21             if (backoff) {
    22               TimeUnit.MILLISECONDS.sleep(backoffInterval);
    23               backoffInterval = backoffInterval << 1;
    24               backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
    25                                 backoffInterval;
    26             }
    27             continue;
    28           }
    29           backoffInterval = 250;
    30           sourceCounter.addToEventAcceptedCount(events.size());
    31           sourceCounter.incrementAppendBatchAcceptedCount();
    32         }
    33         logger.info("Spooling Directory Source runner has shutdown.");
    34       } catch (Throwable t) {
    35         logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " +
    36             "Uncaught exception in SpoolDirectorySource thread. " +
    37             "Restart or reconfigure Flume to continue processing.", t);
    38         hasFatalError = true;
    39         Throwables.propagate(t);
    40       }
    41     }
     
    ReliableSpoolingFileEventReader 定义在SpoolDirectorySource中的reader。看这个名字就知道碉堡了,reliable的,怎么实现reliable的??
    reader的readEvent方法,会根据batchSize大小读取指定的Event
     
    该方法的大致意思:
    如果没有提交,如果当前文件空,错,否则获取EventDeserializer
    如果已经提交,如果当前文件空,则获得下一个文件,之后,如果文件还是空,则返回空Event列表。
     
    之后,调用EventDeserializer的readEvents。
     
     1  public List<Event> readEvents(int numEvents) throws IOException {
     2     if (!committed) {
     3       if (!currentFile.isPresent()) {
     4         throw new IllegalStateException("File should not roll when " +
     5             "commit is outstanding.");
     6       }
     7       logger.info("Last read was never committed - resetting mark position.");
     8       currentFile.get().getDeserializer().reset();
     9     } else {
    10       // Check if new files have arrived since last call
    11       if (!currentFile.isPresent()) {
    12         currentFile = getNextFile();
    13       }
    14       // Return empty list if no new files
    15       if (!currentFile.isPresent()) {
    16         return Collections.emptyList();
    17       }
    18     }
    19 
    20     EventDeserializer des = currentFile.get().getDeserializer();
    21     List<Event> events = des.readEvents(numEvents);
    22 
    23     /* It's possible that the last read took us just up to a file boundary.
    24      * If so, try to roll to the next file, if there is one. */
    25     if (events.isEmpty()) {
    26       retireCurrentFile();
    27       currentFile = getNextFile();
    28       if (!currentFile.isPresent()) {
    29         return Collections.emptyList();
    30       }
    31       events = currentFile.get().getDeserializer().readEvents(numEvents);
    32     }
    33 
    34     //写入header值,略47 
    48     committed = false;
    49     lastFileRead = currentFile;
    50     return events;
    51   }

     在这个方法中,我们看到了

    currentFile:该对象采用了谷歌的Optional进行封装,更加容易判断空指针等等。Optional<FileInfo>,该FileInfo封装了普通的File对象和针对该file对象的EventDeserializer(事件序列器)

    该currentFile主要在ReliableSpoolingFileEventReader 类中的Optional<FileInfo> openFile(File file),Optional<FileInfo> getNextFile() 方法中调用。

    EventDeserializer:事件序列器的主要作用在于定义一些读取的基本操作

    其中mark是读取的行position进行标记

    EventDeserializer的实现子类,很多,这里只讲LineDeserializer,顾名思义,按照行去读取,一行就是一个Event

    虽然EventDeserializer已经涉及到读取行了,但是真正读取记录的还不是他。

    我们看openfile函数中

     1 String nextPath = file.getPath();
     2       PositionTracker tracker =
     3           DurablePositionTracker.getInstance(metaFile, nextPath);
     4       if (!tracker.getTarget().equals(nextPath)) {
     5         tracker.close();
     6         deleteMetaFile();
     7         tracker = DurablePositionTracker.getInstance(metaFile, nextPath);
     8       }
    15       ResettableInputStream in =
    16           new ResettableFileInputStream(file, tracker,
    17               ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset,
    18               decodeErrorPolicy);
    19       EventDeserializer deserializer = EventDeserializerFactory.getInstance
    20           (deserializerType, deserializerContext, in);

    因此可以看出EventDeserializer读取记录是靠 ResettableFileInputStream(in对象),ResettableFileInputStream的初始化需要File类和一个DurablePositionTracker,

    因此,ResettableFileInputStream在读取File内容同时,使用DurablePositionTracker去记录position的信息。

    DurablePositionTracker使用了apache avro来进行持久化

    private final DataFileWriter<TransferStateFileMeta> writer;
    private final DataFileReader<TransferStateFileMeta> reader;

    这样,当我们使用EventDeserializer读取一个event的时候,就会从当前文件流中获取信息,同时也能够记录读取的位置信息。

    当读取batchsize数量的event都正确处理后,ReliableSpoolingFileEventReader 会commit(),持久化位置信息

     public void commit() throws IOException {
        if (!committed && currentFile.isPresent()) {
          currentFile.get().getDeserializer().mark();
          committed = true;
        }
      }

    这里的mark方法,调用

    LineDeserializer的

     @Override
      public void mark() throws IOException {
        ensureOpen();
        in.mark();
      }

    在调用ResettableFileInputStream(in)的mark方法

     @Override
      public void mark() throws IOException {
        tracker.storePosition(tell());
      }

    之后调用位置tracker的storePostition方法(DurablePositionTracker)

     @Override
      public synchronized void storePosition(long position) throws IOException {
        metaCache.setOffset(position);
        writer.append(metaCache);
        writer.sync();
        writer.flush();
      }

    之后,调用avro的DataFileWriter,完成写入操作。

    最后,至于postition位置的持久化逻辑判断,基本也能猜到,当出现trash时候,从未读取的地方开始读取,等等,所以说,是ResettableFileInputStream的输入流,因为他能够读取信息,也能持久化读取的信息位置。

  • 相关阅读:
    栈 队列 递归 算法的GO语言实现
    数组与链表算法
    树算法的GO语言实现
    排序算法的GO语言实现
    如何抵抗记忆的衰退
    C程序运行的背后(2)
    C程序运行的背后(1)
    Shell脚本:“syntax error:unexpected end of file”
    电线的粗细与电流的大小怎么算?电流的大小与电器的功率有什么关系? 如何根据电流的大小选择铜质电线的粗细
    Word2019发布文章到博客园
  • 原文地址:https://www.cnblogs.com/kanliwei/p/4288391.html
Copyright © 2011-2022 走看看