zoukankan      html  css  js  c++  java
  • Flume 1.7 源代码分析(四)从Source写数据到Channel

    Flume 1.7 源代码分析(一)源代码编译
    Flume 1.7 源代码分析(二)总体架构
    Flume 1.7 源代码分析(三)程序入口
    Flume 1.7 源代码分析(四)从Source写数据到Channel
    Flume 1.7 源代码分析(五)从Channel获取数据写入Sink

    5 从Source写数据到Channel

    5.1 Source部分

    5.1.1 SourceRunner

    SourceRunner就是专门用于运行Source的一个类。
    在”物化配置”一节获取配置信息后,会依据Source去获取详细的SourceRunner,调用的是SourceRunner的forSource方法。

    public static SourceRunner forSource(Source source) {
      SourceRunner runner = null;
      if (source instanceof PollableSource) {
        runner = new PollableSourceRunner();
        ((PollableSourceRunner) runner).setSource((PollableSource) source);
      } else if (source instanceof EventDrivenSource) {
        runner = new EventDrivenSourceRunner();
        ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
      } else {
        throw new IllegalArgumentException("No known runner type for source " + source);
      }
      return runner;
    }

    能够看到source分为了2种类型,并有相应的sourceRunner(PollableSourceRunner、EventDrivenSourceRunner)。

    这2种source差别在于是否须要外部的驱动去获取数据,不须要外部驱动(採用自身的事件驱动机制)的称为EventDrivenSource,须要外部驱动的称为PollableSource。

    • 常见的EventDrivenSource:AvroSource、ExecSource、SpoolDirectorySource。
    • 常见的PollableSource:TaildirSource、kafkaSource、JMSSource。

    以EventDrivenSourceRunner为例,由MonitorRunnable调用其start方法:

    public void start() {
      Source source = getSource();
      ChannelProcessor cp = source.getChannelProcessor();
      cp.initialize();//用于初始化Interceptor
      source.start();
      lifecycleState = LifecycleState.START;
    }

    这里的ChannelProcessor是比較重要的一个类,后面会详细说。接下来调用了Source的start方法。能够对比一下之前的总体架构的图。start方法实现的就是这个部分:

    Flume Source start

    5.1.2 ExecSource

    以ExecSource的start方法为例:

    public void start() {
      executor = Executors.newSingleThreadExecutor();
      runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset);
      runnerFuture = executor.submit(runner);
      sourceCounter.start();
      super.start();
    }

    主要启动了一个线程runner。初始化了一下计数器。详细实现还是要看ExecRunable类的run方法:

    public void run() {
      do {
        timedFlushService = Executors.newSingleThreadScheduledExecutor(…);
    //使用配置的參数启动Shell命令
        String[] commandArgs = command.split("\s+");
        process = new ProcessBuilder(commandArgs).start();
    //设置标准输入流
        reader = new BufferedReader(new InputStreamReader(process.getInputStream()…));
        //设置错误流
    StderrReader stderrReader = new StderrReader(…);
        stderrReader.start();
    //启动定时任务。将eventList中数据批量写入到Channel
        future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
            public void run() {
              synchronized (eventList) {
                if (!eventList.isEmpty() && timeout()) {flushEventBatch(eventList);}
              }
            }
        },batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);
    //按行读取标准输出流的内容,并写入eventList
        while ((line = reader.readLine()) != null) {
          synchronized (eventList) {
            sourceCounter.incrementEventReceivedCount();
            eventList.add(EventBuilder.withBody(line.getBytes(charset)))
    //超出配置的大小或者超时后,将eventList写到Channel
            if (eventList.size() >= bufferCount || timeout()) {flushEventBatch(eventList);}
    }
    }
        synchronized (eventList) {if (!eventList.isEmpty()){flushEventBatch(eventList);}}
      } while (restart);//假设配置了自己主动重新启动。当Shell命令的进程结束时,自己主动重新启动命令。
    }

    在该方法中启动了2个reader,分别取读取标准输入流和错误流,将标准输入流中的内容写入eventList。

    与此同一时候启动另外一个线程,调用flushEventBatch方法。定期将eventList中的数据写入到Channel。

    private void flushEventBatch(List<Event> eventList) {
      channelProcessor.processEventBatch(eventList);//假如这里异常的话。eventList还没有清空
      sourceCounter.addToEventAcceptedCount(eventList.size());
      eventList.clear();
      lastPushToChannel = systemClock.currentTimeMillis();
    }

    能够看到这里调用了channelProcessor.processEventBatch()来写入Channel。

    5.2 Channel部分

    5.2.1 ChannelProcessor

    ChannelProcessor的作用是运行所有interceptor。并将eventList中的数据,发送到各个reqChannel、optChannel。ReqChannel和optChannel是通过channelSelector来获取的。

    public interface ChannelSelector extends NamedComponent, Configurable {
      public void setChannels(List<Channel> channels);
      public List<Channel> getRequiredChannels(Event event);
      public List<Channel> getOptionalChannels(Event event);
      public List<Channel> getAllChannels();//获取在当前Source中配置的所有Channel
    }

    假设要自己定义一个ChannelSelector,仅仅须要继承AbstractChannelSelector后,实现getRequiredChannels和getOptionalChannels就可以。

    ReqChannel代表一定保证存储的Channel(失败会不断重试),optChannel代表可能存储的Channel(即失败后不重试)。

    ReqChannel与optChannel的差别从代码上来看。前者在出现异常时,会在运行完回滚后往上层抛,而optChannel则仅仅运行回滚。注意到回滚操作仅仅清空putList(5.2.4节会说明),而这一层假设没有抛出异常的话,调用方(也就是上节的flushEventBatch)会清空eventList,也就是异常之后的数据丢失了。

    发送当中一条数据的代码例如以下:

    try {
      tx.begin();
      reqChannel.put(event);
      tx.commit();
    } catch (Throwable t) {
      tx.rollback();
        //省略部分代码
    }

    当中put调用Channel的doPut方法。commit调用Channel的doCommit方法。
    Channel主要包括4个主要方法:doPut、doTake、doCommit、doRollback。以下以MemoryChannel为例说明。

    5.2.2 doPut方法

    在这种方法中,仅仅包括了递增计数器和将事件加入到putList。

    protected void doPut(Event event) throws InterruptedException {
      channelCounter.incrementEventPutAttemptCount();
      int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
      if (!putList.offer(event)) {
        throw new ChannelException("");
      }
      putByteCounter += eventByteSize;
    }

    假如这种方法中出现了异常,则会抛到ChannelProcessor中运行回滚操作。

    5.2.3 doCommit方法

    这种方法是比較复杂的方法之中的一个。原因在于put和take操作的commit都是通过这种方法来进行的,所以代码里面事实上混合了2个功能(即put和take操作)所需的提交代码。

    单纯从Source写数据到Channel这件事情,流程为eventList->putList->queue。

    因为前面已经完毕了把数据放到putList中。那接下来要做的事情就是将putList中数据放入queue中就能够了。

    这个部分先说明到这里,下一个章节结合take操作一起看这种方法。

    5.2.4 doRollback方法

    与doCommit方法相似,这里的回滚,也分为2种情况:由take操作引起的和由put方法引起的。

    这里先说由put发起的,该transaction的流程例如以下:
    eventList->putList->queue

    因为doPut和doCommit运行出现异常就直接跳出了,还没运行清空语句(这里能够參考“ExecSource“章节的最后一段代码的凝视部分),也就是eventList还没有清空。所以能够直接清空putList。这样下次循环还会又一次读取该eventList中的数据。

    附注:在put操作commit的时候,假设部分数据已经放进queue的话,这个时候回滚,那是否存在数据反复问题呢?依据代码。因为在放队列这个操作之前已经做过非常多推断(容量等等),这个操作仅仅是取出放进队列的操作,而这个代码之后。也仅仅是一些设置计数器的操作,理论上不会出现异常导致回滚了。

  • 相关阅读:
    CSS居中布局总结
    Jquery的普通事件和on的委托事件
    sass基础用法
    JS中常遇到的浏览器兼容问题和解决方法
    KVC
    关键字 self
    常见的出现内存循环引用的场景有哪些?
    XCODE中的蓝色文件夹与黄色文件夹
    oc 关键字
    uiwebview 兼容性
  • 原文地址:https://www.cnblogs.com/jzssuanfa/p/7380024.html
Copyright © 2011-2022 走看看