zoukankan      html  css  js  c++  java
  • Flume 1.7 源代码分析(四)从Source写数据到Channel

    Flume 1.7 源代码分析(一)源代码编译
    Flume 1.7 源代码分析(二)总体架构
    Flume 1.7 源代码分析(三)程序入口
    Flume 1.7 源代码分析(四)从Source写数据到Channel
    Flume 1.7 源代码分析(五)从Channel获取数据写入Sink

    5 从Source写数据到Channel

    5.1 Source部分

    5.1.1 SourceRunner

    SourceRunner就是专门用于运行Source的一个类。
    在”物化配置”一节获取配置信息后,会依据Source去获取详细的SourceRunner,调用的是SourceRunner的forSource方法。

    public static SourceRunner forSource(Source source) {
      SourceRunner runner = null;
      if (source instanceof PollableSource) {
        runner = new PollableSourceRunner();
        ((PollableSourceRunner) runner).setSource((PollableSource) source);
      } else if (source instanceof EventDrivenSource) {
        runner = new EventDrivenSourceRunner();
        ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
      } else {
        throw new IllegalArgumentException("No known runner type for source " + source);
      }
      return runner;
    }

    能够看到source分为了2种类型,并有相应的sourceRunner(PollableSourceRunner、EventDrivenSourceRunner)。

    这2种source差别在于是否须要外部的驱动去获取数据,不须要外部驱动(採用自身的事件驱动机制)的称为EventDrivenSource,须要外部驱动的称为PollableSource。

    • 常见的EventDrivenSource:AvroSource、ExecSource、SpoolDirectorySource。
    • 常见的PollableSource:TaildirSource、kafkaSource、JMSSource。

    以EventDrivenSourceRunner为例,由MonitorRunnable调用其start方法:

    public void start() {
      Source source = getSource();
      ChannelProcessor cp = source.getChannelProcessor();
      cp.initialize();//用于初始化Interceptor
      source.start();
      lifecycleState = LifecycleState.START;
    }

    这里的ChannelProcessor是比較重要的一个类,后面会详细说。接下来调用了Source的start方法。能够对比一下之前的总体架构的图。start方法实现的就是这个部分:

    Flume Source start

    5.1.2 ExecSource

    以ExecSource的start方法为例:

    public void start() {
      executor = Executors.newSingleThreadExecutor();
      runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset);
      runnerFuture = executor.submit(runner);
      sourceCounter.start();
      super.start();
    }

    主要启动了一个线程runner。初始化了一下计数器。详细实现还是要看ExecRunable类的run方法:

    public void run() {
      do {
        timedFlushService = Executors.newSingleThreadScheduledExecutor(…);
    //使用配置的參数启动Shell命令
        String[] commandArgs = command.split("\s+");
        process = new ProcessBuilder(commandArgs).start();
    //设置标准输入流
        reader = new BufferedReader(new InputStreamReader(process.getInputStream()…));
        //设置错误流
    StderrReader stderrReader = new StderrReader(…);
        stderrReader.start();
    //启动定时任务。将eventList中数据批量写入到Channel
        future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
            public void run() {
              synchronized (eventList) {
                if (!eventList.isEmpty() && timeout()) {flushEventBatch(eventList);}
              }
            }
        },batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);
    //按行读取标准输出流的内容,并写入eventList
        while ((line = reader.readLine()) != null) {
          synchronized (eventList) {
            sourceCounter.incrementEventReceivedCount();
            eventList.add(EventBuilder.withBody(line.getBytes(charset)))
    //超出配置的大小或者超时后,将eventList写到Channel
            if (eventList.size() >= bufferCount || timeout()) {flushEventBatch(eventList);}
    }
    }
        synchronized (eventList) {if (!eventList.isEmpty()){flushEventBatch(eventList);}}
      } while (restart);//假设配置了自己主动重新启动。当Shell命令的进程结束时,自己主动重新启动命令。
    }

    在该方法中启动了2个reader,分别取读取标准输入流和错误流,将标准输入流中的内容写入eventList。

    与此同一时候启动另外一个线程,调用flushEventBatch方法。定期将eventList中的数据写入到Channel。

    private void flushEventBatch(List<Event> eventList) {
      channelProcessor.processEventBatch(eventList);//假如这里异常的话。eventList还没有清空
      sourceCounter.addToEventAcceptedCount(eventList.size());
      eventList.clear();
      lastPushToChannel = systemClock.currentTimeMillis();
    }

    能够看到这里调用了channelProcessor.processEventBatch()来写入Channel。

    5.2 Channel部分

    5.2.1 ChannelProcessor

    ChannelProcessor的作用是运行所有interceptor。并将eventList中的数据,发送到各个reqChannel、optChannel。ReqChannel和optChannel是通过channelSelector来获取的。

    public interface ChannelSelector extends NamedComponent, Configurable {
      public void setChannels(List<Channel> channels);
      public List<Channel> getRequiredChannels(Event event);
      public List<Channel> getOptionalChannels(Event event);
      public List<Channel> getAllChannels();//获取在当前Source中配置的所有Channel
    }

    假设要自己定义一个ChannelSelector,仅仅须要继承AbstractChannelSelector后,实现getRequiredChannels和getOptionalChannels就可以。

    ReqChannel代表一定保证存储的Channel(失败会不断重试),optChannel代表可能存储的Channel(即失败后不重试)。

    ReqChannel与optChannel的差别从代码上来看。前者在出现异常时,会在运行完回滚后往上层抛,而optChannel则仅仅运行回滚。注意到回滚操作仅仅清空putList(5.2.4节会说明),而这一层假设没有抛出异常的话,调用方(也就是上节的flushEventBatch)会清空eventList,也就是异常之后的数据丢失了。

    发送当中一条数据的代码例如以下:

    try {
      tx.begin();
      reqChannel.put(event);
      tx.commit();
    } catch (Throwable t) {
      tx.rollback();
        //省略部分代码
    }

    当中put调用Channel的doPut方法。commit调用Channel的doCommit方法。
    Channel主要包括4个主要方法:doPut、doTake、doCommit、doRollback。以下以MemoryChannel为例说明。

    5.2.2 doPut方法

    在这种方法中,仅仅包括了递增计数器和将事件加入到putList。

    protected void doPut(Event event) throws InterruptedException {
      channelCounter.incrementEventPutAttemptCount();
      int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
      if (!putList.offer(event)) {
        throw new ChannelException("");
      }
      putByteCounter += eventByteSize;
    }

    假如这种方法中出现了异常,则会抛到ChannelProcessor中运行回滚操作。

    5.2.3 doCommit方法

    这种方法是比較复杂的方法之中的一个。原因在于put和take操作的commit都是通过这种方法来进行的,所以代码里面事实上混合了2个功能(即put和take操作)所需的提交代码。

    单纯从Source写数据到Channel这件事情,流程为eventList->putList->queue。

    因为前面已经完毕了把数据放到putList中。那接下来要做的事情就是将putList中数据放入queue中就能够了。

    这个部分先说明到这里,下一个章节结合take操作一起看这种方法。

    5.2.4 doRollback方法

    与doCommit方法相似,这里的回滚,也分为2种情况:由take操作引起的和由put方法引起的。

    这里先说由put发起的,该transaction的流程例如以下:
    eventList->putList->queue

    因为doPut和doCommit运行出现异常就直接跳出了,还没运行清空语句(这里能够參考“ExecSource“章节的最后一段代码的凝视部分),也就是eventList还没有清空。所以能够直接清空putList。这样下次循环还会又一次读取该eventList中的数据。

    附注:在put操作commit的时候,假设部分数据已经放进queue的话,这个时候回滚,那是否存在数据反复问题呢?依据代码。因为在放队列这个操作之前已经做过非常多推断(容量等等),这个操作仅仅是取出放进队列的操作,而这个代码之后。也仅仅是一些设置计数器的操作,理论上不会出现异常导致回滚了。

  • 相关阅读:
    Postman使用教程
    CAD和ArcGIS转换 矢量配准
    SAP CRM Advanced search和Simple search里Max hit表现行为的差异
    SAP CRM Product simple search的启用步骤
    如何快速定位SAP CRM订单应用(Order Application)错误消息抛出的准确位置
    如何动态修改SAP CRM WebClient UI表格栏的宽度
    如何在SAP CRM WebClient UI里创建web service并使用ABAP消费
    如何处理SAP CRM Web Service错误
    如何使用SAP CRM WebClient UI实现一个类似新浪微博的字数统计器
    如何开启SAP CRM基于WORD模板创建附件的功能
  • 原文地址:https://www.cnblogs.com/jzssuanfa/p/7380024.html
Copyright © 2011-2022 走看看