zoukankan      html  css  js  c++  java
  • Flume-ng源码解析之Source组件

    如果你还没看过Flume-ng源码解析系列中的启动流程、Channel组件和Sink组件,可以点击下面链接:
    Flume-ng源码解析之启动流程
    Flume-ng源码解析之Channel组件
    Flume-ng源码解析之Sink组件

    在前面三篇文章中我们初步了解了Flume的启动流程、Channel组件和Sink组件,接下来我们一起来看看agent三大组件中Source组件。

    1 Source

    Source,作为agent中的消息来源组件,我们来看看它是如何将event传递给channel的和它的特性。

    依然先看代码:

    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public interface Source extends LifecycleAware, NamedComponent {
      public void setChannelProcessor(ChannelProcessor channelProcessor);
      public ChannelProcessor getChannelProcessor();
    }
    

    我们可以看到它里面定义的两个需要实现方法是getChannelProcessor和setChannelProcessor,我们大概可以猜到,source就是通过ChannelProcessor将event传输给channel的。

    这里先来了解一下Source的类型,Flume根据数据来源的特性将Source分成两类类,像Http、netcat和exec等就是属于事件驱动型(EventDrivenSource),而kafka和Jms等就是属于轮询拉取型(PollableSource)。

    据我们在启动流程中了解到的,Application是先启动SourceRunner,再由SourceRunner来启动source,那么既然source有两种类型,那么Sourcerunner也分为EventDrivenSourceRunner和PollableSourceRunner,我们来看看它们的start():

    EventDrivenSourceRunner

    public class EventDrivenSourceRunner extends SourceRunner {
      …
      @Override
      public void start() {
        Source source = getSource();
        ChannelProcessor cp = source.getChannelProcessor();
        cp.initialize();
        source.start();
        lifecycleState = LifecycleState.START;
      }
      …
    }
    

    PollableSourceRunner

    public class PollableSourceRunner extends SourceRunner {
    
      …
      @Override
      public void start() {
        PollableSource source = (PollableSource) getSource();
        ChannelProcessor cp = source.getChannelProcessor();
        cp.initialize();
        source.start();
    
        runner = new PollingRunner();
    
        runner.source = source;
        runner.counterGroup = counterGroup;
        runner.shouldStop = shouldStop;
    
        runnerThread = new Thread(runner);
        runnerThread.setName(getClass().getSimpleName() + "-" + 
            source.getClass().getSimpleName() + "-" + source.getName());
        runnerThread.start();
    
        lifecycleState = LifecycleState.START;
      }
    
      …
      public static class PollingRunner implements Runnable {
    
        private PollableSource source;
        private AtomicBoolean shouldStop;
        private CounterGroup counterGroup;
    
        @Override
        public void run() {
          logger.debug("Polling runner starting. Source:{}", source);
    
          while (!shouldStop.get()) {
            counterGroup.incrementAndGet("runner.polls");
    
            try {
              if (source.process().equals(PollableSource.Status.BACKOFF)) {
                counterGroup.incrementAndGet("runner.backoffs");
    
                Thread.sleep(Math.min(
                    counterGroup.incrementAndGet("runner.backoffs.consecutive")
                    * source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
              } else {
                counterGroup.set("runner.backoffs.consecutive", 0L);
              }
            } catch (InterruptedException e) {
              logger.info("Source runner interrupted. Exiting");
              counterGroup.incrementAndGet("runner.interruptions");
            } catch (EventDeliveryException e) {
              logger.error("Unable to deliver event. Exception follows.", e);
              counterGroup.incrementAndGet("runner.deliveryErrors");
            } catch (Exception e) {
              counterGroup.incrementAndGet("runner.errors");
              logger.error("Unhandled exception, logging and sleeping for " +
                  source.getMaxBackOffSleepInterval() + "ms", e);
              try {
                Thread.sleep(source.getMaxBackOffSleepInterval());
              } catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
              }
            }
          }
    
          logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
        }
    
      }
    
    }
    

    无论是PollableSourceRunner还是EventDrivenSourceRunner,都是调用它里面的source的start()。这个时候我们看到ChannelProcessor的存在,那么就会有疑惑,这ChannelProcessor哪来的?我们还是得看回AbstarctConfigurationProvider,查看里面的loadSources(),我们就会发现下面这段代码:

    ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();
    ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig);
    ChannelProcessor channelProcessor = new ChannelProcessor(selector);
    Configurables.configure(channelProcessor, config);
    source.setChannelProcessor(channelProcessor);
    

    到这里我们基本已经了解了Source的启动流程,下面以AvroSource为例看看,source是在哪里调用ChannelProcessor的插入方法。

    2 AvroSource

    public class AvroSource extends AbstractSource implements EventDrivenSource,
        Configurable, AvroSourceProtocol {
      …
      @Override
      public Status append(AvroFlumeEvent avroEvent) {
        if (logger.isDebugEnabled()) {
          if (LogPrivacyUtil.allowLogRawData()) {
            logger.debug("Avro source {}: Received avro event: {}", getName(), avroEvent);
          } else {
            logger.debug("Avro source {}: Received avro event", getName());
          }
        }
    
        sourceCounter.incrementAppendReceivedCount();
        sourceCounter.incrementEventReceivedCount();
    
        Event event = EventBuilder.withBody(avroEvent.getBody().array(),
            toStringMap(avroEvent.getHeaders()));
    
        try {
          getChannelProcessor().processEvent(event);
        } catch (ChannelException ex) {
          logger.warn("Avro source " + getName() + ": Unable to process event. " +
              "Exception follows.", ex);
          return Status.FAILED;
        }
    
        sourceCounter.incrementAppendAcceptedCount();
        sourceCounter.incrementEventAcceptedCount();
    
        return Status.OK;
      }
    
      @Override
      public Status appendBatch(List<AvroFlumeEvent> events) {
        logger.debug("Avro source {}: Received avro event batch of {} events.",
            getName(), events.size());
        sourceCounter.incrementAppendBatchReceivedCount();
        sourceCounter.addToEventReceivedCount(events.size());
    
        List<Event> batch = new ArrayList<Event>();
    
        for (AvroFlumeEvent avroEvent : events) {
          Event event = EventBuilder.withBody(avroEvent.getBody().array(),
              toStringMap(avroEvent.getHeaders()));
    
          batch.add(event);
        }
    
        try {
          getChannelProcessor().processEventBatch(batch);
        } catch (Throwable t) {
          logger.error("Avro source " + getName() + ": Unable to process event " +
              "batch. Exception follows.", t);
          if (t instanceof Error) {
            throw (Error) t;
          }
          return Status.FAILED;
        }
    
        sourceCounter.incrementAppendBatchAcceptedCount();
        sourceCounter.addToEventAcceptedCount(events.size());
    
        return Status.OK;
      }
      …
    }
    
    

    在append方法中我们可以看到getChannelProcessor().processEvent(event);,所以不同的Source根据它的不同触发机制和拉取机制,在特定的时候调用ChannelProcessor来执行event的插入。 ·

    到此为止,我们就完成了对Flume启动流程和三大组件的研究,鉴于能力,其中有些细节没办法深入研究,希望以后有时间能够继续深入分析下去。

  • 相关阅读:
    同一个人
    11.13作业第十一次AB类
    实类化对象
    字符串数组 输入3个字符串,要求按由小到大的字母顺序输出; 输入n个学生的姓名和学号到字符串数组中,在输入一个姓名,如果班级有该生则返回其信息,否则返回本班无此人
    字符数组
    作业:例题5.7 用选择法对数组中10个整数按由小到大排序。要求使用函数的方法。
    有一个3*4的矩阵,编程求出其中最大值,以及其所在的行号和列号。
    函数重载
    冒泡数组
    裴波那契数列
  • 原文地址:https://www.cnblogs.com/simple-focus/p/6535833.html
Copyright © 2011-2022 走看看