没有运行,仅仅凭看代码推测的运行过程如下。
log4j配置
#log4j.logger.com.loadbalance= DEBUG,loadbalance
#log4j.additivity.com.loadbalance= true
log4j.appender.loadbalance = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.loadbalance.Hosts =machine-2:51515 hadoop-master:51515
#log4j.appender.loadbalance.UnsafeMode = true
log4j.appender.out2.MaxBackoff = 30000
#FQDN RANDOM ,default is ROUND_ROBIN
log4j.appender.loadbalance.Selector = RANDOM
log4j.appender.loadbalance.layout=org.apache.log4j.PatternLayout
log4j.appender.loadbalance.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n
LoadBalancingLog4jAppender是Log4jAppender的子类,会调用Log4jAppender的append方法
该append方法中会组织Event事件,最后
try {
rpcClient.append(flumeEvent);
} catch (EventDeliveryException e) {
String msg = "Flume append() failed.";
LogLog.error(msg);
if (unsafeMode) {
return;
}
该rpcClient根据log4j中配置的获得 LoadBalancingRpcClient 实例
执行 LoadBalancingRpcClient 的append方法
Iterator<HostInfo> it = selector.createHostIterator();
while (it.hasNext()) {
HostInfo host = it.next();
try {
RpcClient client = getClient(host);
client.append(event);
eventSent = true;
break;
} catch (Exception ex) {
selector.informFailure(host);
LOGGER.warn("Failed to send event to host " + host, ex);
}
}
选择用户设置的地址,获得对应的Rpc通信,发送成功,则跳出。
之后,会调用远端服务器的AvroSource的append方法
@Override
public Status append(AvroFlumeEvent avroEvent) {
logger.debug("Avro source {}: Received avro event: {}", getName(),
avroEvent);
sourceCounter.incrementAppendReceivedCount();
sourceCounter.incrementEventReceivedCount();
Event event = EventBuilder.withBody(avroEvent.getBody().array(),
toStringMap(avroEvent.getHeaders()));
try {
getChannelProcessor().processEvent(event);
} catch (ChannelException ex) {
logger.warn("Avro source " + getName() + ": Unable to process event. " +
"Exception follows.", ex);
return Status.FAILED;
}
sourceCounter.incrementAppendAcceptedCount();
sourceCounter.incrementEventAcceptedCount();
return Status.OK;
}
直接将Event,在事务内发送到channel中,返回rpc的结果,这样
客户端就将一个Event成功发送到channel中,继续完成下面的业务操作。