zoukankan      html  css  js  c++  java
  • flume-ng源码阅读RollingFileSink(原创)

    org.apache.flume.sink.RollingFileSink    这个类比较简单。

    source的种类有两种:一种是PollableSource;另外一种是EventDrivenSource。前者“必须有它自己的callback机制,该机制用于捕获新数据并将数据存储到通道中”,后者“不是由其自身的线程驱动”。在自定义source时,前者必须要实现process方法,通过调用这个方法将event放入channel中;后者没有这个方法,可以自由发挥。

    sink不像source,只有一种sink,需要extends AbstractSink implements Configurable。

    sink用于从通道中提取事件,并将事件传送到流中的下个flume或者将事件存储到一个外部数据仓库。一个sink与一个或者多个通道相连,在flume属性文件中配置。

    有一个SinkRunner实例与各个配置好的sink相连。当flume框架调用 SinkRunner.start(),一个新的线程产生来驱动该sink(使用SinkRunner.PollingRunner as the thread’s Runnable)。该线程管理这个sink的生命周期。sink需要实现start()和stop()方法,这两个方法是LifecycleAware接口的一部分。Sink.start()方法会初始化sink,让该sink可以将事件传输到下个目的地。Sink.process()方法是一个核心操作,它将事件从通道中提取并传输事件。Sink.stop()方法为必要的清理方法(例如释放资源)。sink的实现还需要实现Configurable 接口,用于处理其自身的参数配置。

    RollingFileSink类中需要理解的有start()方法和process()方法。

    一、start()方法

      主要作用在于启动了一个线程,用来每隔rollInterval秒就创建一个新的文件(程序启动的时间戳+文件编号为名称,配置文件动态修改后,时间戳会变动)。并且通知process可以将正在写的这个文件关闭,准备写新的文件。

      需要注意这个变量shouldRotate,初始的时候即start之前,是false的,执行start之后由于线程启动之后首次运行需要等待rollInterval秒,所以这个时间段shouldRotate一直是false,在此期间process方法会一直向一个文件写数据,直到shouldRotate=true,也就是线程每隔rollInterval秒运行之后(shouldRotate会设置为true,并且会获得下一个写入的文件名),这会周期性的运行。

    二、process方法

      这个方法会一直被重复调用。

      它会首先判断是否需要关闭当前的文件shouldRotate=true就会关闭文件。并且重新

      shouldRotate=false;//可以对下一个文件一直写

      pathController.rotate();//表示当前的文件写入完毕,并且可以准备写入下一个文件

      

      File currentFile = pathController.getCurrentFile();//在pathController.rotate()之后该方法就可以获取下一个将要写入的文件名,

      //start方法中也有这句代码,目测start中的这句代码没啥作用,因为shouldRotate = true会导致文件的滚动

      rollService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
              logger.debug("Marking time to rotate file {}",
                  pathController.getCurrentFile());//这句没啥作用。。。。。。。。
              shouldRotate = true;  //表示当前文件写满,准备写下一个文件
            }

          }, rollInterval, rollInterval, TimeUnit.SECONDS);

      

      serializer默认是org.apache.flume.serialization.BodyTextEventSerializer

      接下来就是向channel发送数据了。。。

      Status result = Status.READY;

      transaction.begin();

      event = channel.take();

      //自己的处理逻辑,本类中就是序列化到文件中serializer.write(event)

      transaction.commit();

      transaction.rollback();

      transaction.close();

    这个比较简单,可以用来熟悉sink的处理流程,以及学习如何自定义sink。

  • 相关阅读:
    我的知识库(4) java获取页面编码(Z)
    知识库(3)JAVA 正则表达式 (超详细)
    The Struts dispatcher cannot be found. This is usually caused by using Struts tags without the associated filter. Struts
    某人总结的《英语听力的技巧 》,挺搞的
    我的知识库(5)java单例模式详解
    构建可扩展程序
    SerialPort (RS232 Serial COM Port) in C# .NET
    Python学习笔记——String、Sequences
    UI题目我的答案
    jQuery学习系列学会操纵Form表单元素(1)
  • 原文地址:https://www.cnblogs.com/lxf20061900/p/3643244.html
Copyright © 2011-2022 走看看