zoukankan      html  css  js  c++  java
  • flume-ng源码阅读RollingFileSink(原创)

    org.apache.flume.sink.RollingFileSink    这个类比较简单。

    source的种类有两种:一种是PollableSource;另外一种是EventDrivenSource。前者“必须有它自己的callback机制,该机制用于捕获新数据并将数据存储到通道中”,后者“不是由其自身的线程驱动”。在自定义source时,前者必须要实现process方法,通过调用这个方法将event放入channel中;后者没有这个方法,可以自由发挥。

    sink不像source,只有一种sink,需要extends AbstractSink implements Configurable。

    sink用于从通道中提取事件,并将事件传送到流中的下个flume或者将事件存储到一个外部数据仓库。一个sink与一个或者多个通道相连,在flume属性文件中配置。

    有一个SinkRunner实例与各个配置好的sink相连。当flume框架调用 SinkRunner.start(),一个新的线程产生来驱动该sink(使用SinkRunner.PollingRunner as the thread’s Runnable)。该线程管理这个sink的生命周期。sink需要实现start()和stop()方法,这两个方法是LifecycleAware接口的一部分。Sink.start()方法会初始化sink,让该sink可以将事件传输到下个目的地。Sink.process()方法是一个核心操作,它将事件从通道中提取并传输事件。Sink.stop()方法为必要的清理方法(例如释放资源)。sink的实现还需要实现Configurable 接口,用于处理其自身的参数配置。

    RollingFileSink类中需要理解的有start()方法和process()方法。

    一、start()方法

      主要作用在于启动了一个线程,用来每隔rollInterval秒就创建一个新的文件(程序启动的时间戳+文件编号为名称,配置文件动态修改后,时间戳会变动)。并且通知process可以将正在写的这个文件关闭,准备写新的文件。

      需要注意这个变量shouldRotate,初始的时候即start之前,是false的,执行start之后由于线程启动之后首次运行需要等待rollInterval秒,所以这个时间段shouldRotate一直是false,在此期间process方法会一直向一个文件写数据,直到shouldRotate=true,也就是线程每隔rollInterval秒运行之后(shouldRotate会设置为true,并且会获得下一个写入的文件名),这会周期性的运行。

    二、process方法

      这个方法会一直被重复调用。

      它会首先判断是否需要关闭当前的文件shouldRotate=true就会关闭文件。并且重新

      shouldRotate=false;//可以对下一个文件一直写

      pathController.rotate();//表示当前的文件写入完毕,并且可以准备写入下一个文件

      

      File currentFile = pathController.getCurrentFile();//在pathController.rotate()之后该方法就可以获取下一个将要写入的文件名,

      //start方法中也有这句代码,目测start中的这句代码没啥作用,因为shouldRotate = true会导致文件的滚动

      rollService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
              logger.debug("Marking time to rotate file {}",
                  pathController.getCurrentFile());//这句没啥作用。。。。。。。。
              shouldRotate = true;  //表示当前文件写满,准备写下一个文件
            }

          }, rollInterval, rollInterval, TimeUnit.SECONDS);

      

      serializer默认是org.apache.flume.serialization.BodyTextEventSerializer

      接下来就是向channel发送数据了。。。

      Status result = Status.READY;

      transaction.begin();

      event = channel.take();

      //自己的处理逻辑,本类中就是序列化到文件中serializer.write(event)

      transaction.commit();

      transaction.rollback();

      transaction.close();

    这个比较简单,可以用来熟悉sink的处理流程,以及学习如何自定义sink。

  • 相关阅读:
    微信公众平台—— 获取微信服务器IP地址
    微信公众平台——获取access_token、expires_in
    PHP版本切换
    开源各种系统
    VUE -- 如何快速的写出一个Vue的icon组件?
    Mac下php 5升级到php 7的步骤详解
    Nginx反向代理导致PHP获取不到正确的HTTP_HOST,SERVER_NAME,客户端IP的解决方法
    nginx服务器URL无法自动添加index.php
    php类库安装xml simplexml
    Mac DBeaver Client home is not specified for connection解决办法
  • 原文地址:https://www.cnblogs.com/lxf20061900/p/3643244.html
Copyright © 2011-2022 走看看