zoukankan      html  css  js  c++  java
  • Flume源码-LoggerSink

    package org.apache.flume.sink;
    
    import com.google.common.base.Strings;
    import org.apache.flume.Channel;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.Transaction;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.event.EventHelper;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class LoggerSink extends AbstractSink implements Configurable {
    
      private static final Logger logger = LoggerFactory
          .getLogger(LoggerSink.class);
    
      // Default Max bytes to dump
      public static final int DEFAULT_MAX_BYTE_DUMP = 16;
    
      // Max number of bytes to be dumped
      private int maxBytesToLog = DEFAULT_MAX_BYTE_DUMP;
    
      public static final String MAX_BYTES_DUMP_KEY = "maxBytesToLog";
    
      @Override
      public void configure(Context context) {
        String strMaxBytes = context.getString(MAX_BYTES_DUMP_KEY);
        if (!Strings.isNullOrEmpty(strMaxBytes)) {
          try {
            maxBytesToLog = Integer.parseInt(strMaxBytes);
          } catch (NumberFormatException e) {
            logger.warn(String.format("Unable to convert %s to integer, using default value(%d) for maxByteToDump",
                    strMaxBytes, DEFAULT_MAX_BYTE_DUMP));
            maxBytesToLog = DEFAULT_MAX_BYTE_DUMP;
          }
        }
      }
    
      @Override
      public Status process() throws EventDeliveryException {
        Status result = Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Event event = null;
    
        try {
          transaction.begin();
          event = channel.take();
    
          if (event != null) {
            if (logger.isInfoEnabled()) {
              logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
            }
          } else {
            // No event found, request back-off semantics from the sink runner
            result = Status.BACKOFF;
          }
          transaction.commit();
        } catch (Exception ex) {
          transaction.rollback();
          throw new EventDeliveryException("Failed to log event: " + event, ex);
        } finally {
          transaction.close();
        }
    
        return result;
      }
    }
  • 相关阅读:
    宝塔相关问题
    免费xshell下载
    服务器断电mysql无法恢复
    mysql相关知识
    svn
    tortoisesvn下载 和svn 安装
    nginx+lua乐观锁实现秒杀
    c# asp.net 生成唯一订单号
    c# 关闭软件 进程 杀死进程
    国内开源软件镜像地址搜集
  • 原文地址:https://www.cnblogs.com/mengyao/p/4907103.html
Copyright © 2011-2022 走看看