FLUME SOURCE
记得在AbstractConfigurationProvider中的getConfiguration方法中会执行
loadSources(agentConf, channelComponentMap, sourceRunnerMap);
方法。这个方法会中会有个SourceFactory去创建不同类型的SourceRunnner。
这就是Source 的入口。
整个source的代码分在好几处:
- flume-ng-core的org.apache.flume包里面。
- flume-ng-core的org.apache.flume.source包里面。
- flume-ng-source里面。
Source根据分为两种,PollableSource (轮训拉取)和 EventDrivenSource (事件驱动),说白了一个是主动,一个是被动。
PollableSource 的类图如下:
这里先解释下LifecycleAware这个接口,它定义了每个组件的状态,启动的时候都会调用start方法,停止的时候都会调用stop方法。
public interface LifecycleAware {
public void start();
public void stop();
public LifecycleState getLifecycleState();
}
Source接口的定义如下.里面说明了每个Source都需要指定对应的channel。
public interface Source extends LifecycleAware, NamedComponent {
public void setChannelProcessor(ChannelProcessor channelProcessor);
public ChannelProcessor getChannelProcessor();
}
PollableSource 这个接口,增加了等待延时和最长延时的方法。
AbstractPollableSource 抽象类继承了BasicSourceSemantics ,
BasicSourceSemantics实现了组件在start和stop的时候需要做的一些公共的事情,例如设置LifecycleState 的状态。
整个AbstractPollableSource 将所有source需要做的和Flume环境相关的事情都写好了,其余的每个source在实现的时候,只需要实现这些source自己的启动方式就好了。以KafkaSource 为例,它的doStart()、doStop()、doConfigure()只需要考虑和kafka相关的实现就可以。
再看PollableSourceRunner这个类,它的作用就是创就按一个线程来启动不同的Source组件。