zoukankan      html  css  js  c++  java
  • log4j到flume的过程(LoadBalancingLog4jAppender)

    没有运行,仅仅凭看代码推测的运行过程如下。

    log4j配置

    #log4j.logger.com.loadbalance= DEBUG,loadbalance
    #log4j.additivity.com.loadbalance= true
    log4j.appender.loadbalance = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
    log4j.appender.loadbalance.Hosts =machine-2:51515 hadoop-master:51515
    #log4j.appender.loadbalance.UnsafeMode = true
    log4j.appender.out2.MaxBackoff = 30000
    #FQDN RANDOM ,default is ROUND_ROBIN
    log4j.appender.loadbalance.Selector = RANDOM
    log4j.appender.loadbalance.layout=org.apache.log4j.PatternLayout
    log4j.appender.loadbalance.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n

    LoadBalancingLog4jAppender是Log4jAppender的子类,会调用Log4jAppender的append方法

    该append方法中会组织Event事件,最后

    try {
      rpcClient.append(flumeEvent);
    } catch (EventDeliveryException e) {
      String msg = "Flume append() failed.";
      LogLog.error(msg);
      if (unsafeMode) {
      return;
    }

    该rpcClient根据log4j中配置的获得 LoadBalancingRpcClient 实例

    执行 LoadBalancingRpcClient 的append方法

    Iterator<HostInfo> it = selector.createHostIterator();

    while (it.hasNext()) {
      HostInfo host = it.next();
      try {
        RpcClient client = getClient(host);
        client.append(event);
        eventSent = true;
        break;
      } catch (Exception ex) {
        selector.informFailure(host);
        LOGGER.warn("Failed to send event to host " + host, ex);
      }
    }

    选择用户设置的地址,获得对应的Rpc通信,发送成功,则跳出。

    之后,会调用远端服务器的AvroSource的append方法

    @Override
    public Status append(AvroFlumeEvent avroEvent) {
      logger.debug("Avro source {}: Received avro event: {}", getName(),
      avroEvent);
      sourceCounter.incrementAppendReceivedCount();
      sourceCounter.incrementEventReceivedCount();

      Event event = EventBuilder.withBody(avroEvent.getBody().array(),
      toStringMap(avroEvent.getHeaders()));

      try {
        getChannelProcessor().processEvent(event);
      } catch (ChannelException ex) {
        logger.warn("Avro source " + getName() + ": Unable to process event. " +
        "Exception follows.", ex);
        return Status.FAILED;
      }

      sourceCounter.incrementAppendAcceptedCount();
      sourceCounter.incrementEventAcceptedCount();

      return Status.OK;
    }

    直接将Event,在事务内发送到channel中,返回rpc的结果,这样

    客户端就将一个Event成功发送到channel中,继续完成下面的业务操作。

  • 相关阅读:
    【1801日語听解4】第14回:6月9日
    【日語听解2】第14回:6月8日
    【日語視聴説2】第14回:6月8日
    【日本語新聞編集】第13回:6月5日
    【1801日語写作】第13回:6月4日
    【日本語新聞選読】第13回:6月2日
    Win10版本
    家庭或小规模无线使用方
    批处理bat复制命令Copy与Xcopy
    批处理bat删除命令
  • 原文地址:https://www.cnblogs.com/kanliwei/p/4274900.html
Copyright © 2011-2022 走看看