zoukankan      html  css  js  c++  java
  • log4j到flume的过程(LoadBalancingLog4jAppender)

    没有运行,仅仅凭看代码推测的运行过程如下。

    log4j配置

    #log4j.logger.com.loadbalance= DEBUG,loadbalance
    #log4j.additivity.com.loadbalance= true
    log4j.appender.loadbalance = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
    log4j.appender.loadbalance.Hosts =machine-2:51515 hadoop-master:51515
    #log4j.appender.loadbalance.UnsafeMode = true
    log4j.appender.out2.MaxBackoff = 30000
    #FQDN RANDOM ,default is ROUND_ROBIN
    log4j.appender.loadbalance.Selector = RANDOM
    log4j.appender.loadbalance.layout=org.apache.log4j.PatternLayout
    log4j.appender.loadbalance.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n

    LoadBalancingLog4jAppender是Log4jAppender的子类,会调用Log4jAppender的append方法

    该append方法中会组织Event事件,最后

    try {
      rpcClient.append(flumeEvent);
    } catch (EventDeliveryException e) {
      String msg = "Flume append() failed.";
      LogLog.error(msg);
      if (unsafeMode) {
      return;
    }

    该rpcClient根据log4j中配置的获得 LoadBalancingRpcClient 实例

    执行 LoadBalancingRpcClient 的append方法

    Iterator<HostInfo> it = selector.createHostIterator();

    while (it.hasNext()) {
      HostInfo host = it.next();
      try {
        RpcClient client = getClient(host);
        client.append(event);
        eventSent = true;
        break;
      } catch (Exception ex) {
        selector.informFailure(host);
        LOGGER.warn("Failed to send event to host " + host, ex);
      }
    }

    选择用户设置的地址,获得对应的Rpc通信,发送成功,则跳出。

    之后,会调用远端服务器的AvroSource的append方法

    @Override
    public Status append(AvroFlumeEvent avroEvent) {
      logger.debug("Avro source {}: Received avro event: {}", getName(),
      avroEvent);
      sourceCounter.incrementAppendReceivedCount();
      sourceCounter.incrementEventReceivedCount();

      Event event = EventBuilder.withBody(avroEvent.getBody().array(),
      toStringMap(avroEvent.getHeaders()));

      try {
        getChannelProcessor().processEvent(event);
      } catch (ChannelException ex) {
        logger.warn("Avro source " + getName() + ": Unable to process event. " +
        "Exception follows.", ex);
        return Status.FAILED;
      }

      sourceCounter.incrementAppendAcceptedCount();
      sourceCounter.incrementEventAcceptedCount();

      return Status.OK;
    }

    直接将Event,在事务内发送到channel中,返回rpc的结果,这样

    客户端就将一个Event成功发送到channel中,继续完成下面的业务操作。

  • 相关阅读:
    判断窗体 show完成
    【洛谷1349】广义斐波那契数列
    【洛谷2744 】【CJOJ1804】[USACO5.3]量取牛奶Milk Measuring
    【洛谷T7153】(考试) 中位数
    【洛谷T7152】(考试题目)细胞
    【洛谷1962】 斐波那契数列
    【洛谷1855】 榨取kkksc03
    【HDU2255】奔小康赚大钱
    【洛谷1402】酒店之王
    【洛谷1607】【USACO09FEB】庙会班车
  • 原文地址:https://www.cnblogs.com/kanliwei/p/4274900.html
Copyright © 2011-2022 走看看