zoukankan      html  css  js  c++  java
  • <Flume><Source Code><Flume源码阅读笔记>

    Overview

    • source采集的日志首先会传入ChannelProcessor, 在其内首先会通过Interceptors进行过滤加工,然后通过ChannelSelector选择channel。

    • Source和Sink之间是异步的,sink只需要监听自己关系的Channel的变化即可。

    • sink存在写失败的情况,flume提供了如下策略:

      • 默认是一个sink,若写入失败,则该事务失败,稍后重试。

      • 故障转移策略:给多个sink定义优先级,失败时会路由到下一个优先级的sink。sink只要抛出一次异常就会被认为是失败了,则从存活Sink中移除,然后指数级时间等待重试,默认是等待1s开始重试,最大等待重试时间是30s.

    • flume还提供了负载均衡策略:默认提供轮训和随机两种算法。通过抽象一个类似ChannelSelector的SinkSelector进行选择。

    • 以上,对于Source和sink如何异步、channel如何实现事务机制,详见后面的具体源码分析。

    The whole process

    • 首先是flume的启动, 提供了两种启动方式:使用EmbeddedAgent内嵌在Java应用中或使用Application单独启动一个进程。 一般使用Application起一个进程比较多,我们这里也主要分析这种方式。

    • 程序入口:org.apache.flume.node.Application的main方法。

    • 注:因为暂时还没有了解到Zookeeper原理,所以这里关于ZK的部分就跳过了。

    • flume启动流程大致如下:

      1. 设置默认值启动参数,参数是否是必须的

        Options options = new Options();
        ​
        Option option = new Option("n", "name", true, "the name of this agent");
        option.setRequired(true);
        options.addOption(option);
        ​
        option = new Option("f", "conf-file", true,
                  "specify a config file (required if -z missing)");
        option.setRequired(false);
        options.addOption(option);
        ......
      2. 解析命令行参数

        if (commandLine.hasOption('h')) {
           new HelpFormatter().printHelp("flume-ng agent", options, true);
           return;
        }
        String agentName = commandLine.getOptionValue('n');
        boolean reload = !commandLine.hasOption("no-reload-conf"); // 是否reload配置文件
        ​
        if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
            isZkConfigured = true;
        }
      3. Zookepper相关:暂时略

      4. 打开配置文件

           
         if (isZkConfigured) {
                ... // 若配置了zk,则使用zk参数启动
              } else {
                // 打开配置文件,如果不存在则快速失败
                File configurationFile = new File(commandLine.getOptionValue('f'));
        ​
                // 确保没有配置文件的时候agent会启动失败
                if (!configurationFile.exists()) {
                  ...// If command line invocation, then need to fail fast
                }
                List<LifecycleAware> components = Lists.newArrayList();
        ​
                // 若需要定期reload配置文件
                if (reload) {
                  // 使用EventBus事件总线, to allow publish-subscribe-style communication
                  EventBus eventBus = new EventBus(agentName + "-event-bus");
                  // 读取配置文件,使用定期轮训拉起策略,默认30s拉取一次
                  PollingPropertiesFileConfigurationProvider configurationProvider =
                      new PollingPropertiesFileConfigurationProvider(
                          agentName, configurationFile, eventBus, 30);
                  components.add(configurationProvider);
                  // 向Application注册组件
                  application = new Application(components);
                  // 向EventBus注册本应用,EB会自动注册Application中使用@Subscribe声明的方法
                  // TODO: EventBus, and why reload configuration
                  eventBus.register(application);
                } else {
                  // 若配置文件不支持定期reload
                  PropertiesFileConfigurationProvider configurationProvider =
                      new PropertiesFileConfigurationProvider(agentName, configurationFile);
                  application = new Application();
                  // 直接使用配置文件初始化Flume组件
         application.handleConfigurationEvent(configurationProvider.getConfiguration());
                }
              }
      5. reload conf:若需要reload,则使用事件总线EventBus实现,Application的handleConfigurationEvent是事件订阅者,PollingPropertiesFileConfigurationProvider是事件发布者,其会定期轮训检查文件是否变更,如果变更则重新读取配置文件,发布配置文件事件变更,而handleConfigurationEvent会收到该配置变更重新进行初始化。

      6. handleConfigurationEvent:

         @Subscribe
          public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
            // MaterializedConfiguration存储Flume运行时需要的组件:Source、Channel、Sink、SourceRunner、SinkRunner等。其是通过ConfigurationProvider进行初始化获取,比如PollingPropertiesFileConfigurationProvider会读取配置文件然后进行组件的初始化。
            stopAllComponents(); // 停止所有组件
            startAllComponents(conf);// 使用配置文件初始化所有组件
          }
      7. startAllComponents

        • 要首先启动channels,等待所有channels启动才能继续。然后启动SinkRunner,准备好消费者。最后启动SourceRunner开始进行采集日志。

        • LifecycleSupervisor是组件守护哨兵,对这些组件进行守护,出问题时默认策略是自动重启。

        • 这里的启动都是supervisor.supervise(entry.getValue(),new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); 这是如何启动的,我们后面再介绍。

        private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
            logger.info("Starting new configuration:{}", materializedConfiguration);
        ​
            this.materializedConfiguration = materializedConfiguration;
        ​
            // 启动channels。
            for (Entry<String, Channel> entry :
                materializedConfiguration.getChannels().entrySet()) {
              try {
                logger.info("Starting Channel " + entry.getKey());
                // TODO: LifecycleSupervisor启动
                // new SupervisorPolicy.AlwaysRestartPolicy():使用失败时总是重启的策略
                // LifecycleState.START: 初始化组件默认状态为START
                supervisor.supervise(entry.getValue(),
                    new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
              } catch (Exception e) {
                logger.error("Error while starting {}", entry.getValue(), e);
              }
            }
        ​
            /*
             * Wait for all channels to start.
             */
            for (Channel ch : materializedConfiguration.getChannels().values()) {
              while (ch.getLifecycleState() != LifecycleState.START
                  && !supervisor.isComponentInErrorState(ch)) {
                try {
                  logger.info("Waiting for channel: " + ch.getName() +
                      " to start. Sleeping for 500 ms");
                  Thread.sleep(500);
                } catch (InterruptedException e) {
                  logger.error("Interrupted while waiting for channel to start.", e);
                  Throwables.propagate(e);
                }
              }
            }
        ​
            // 启动sinkRunner
            for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) {
              try {
                logger.info("Starting Sink " + entry.getKey());
                supervisor.supervise(entry.getValue(),
                    new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
              } catch (Exception e) {
                logger.error("Error while starting {}", entry.getValue(), e);
              }
            }
        ​
            // 启动SourceRunner   TODO: SourceRunner & SinkRunner
            for (Entry<String, SourceRunner> entry :
                 materializedConfiguration.getSourceRunners().entrySet()) {
              try {
                logger.info("Starting Source " + entry.getKey());
                supervisor.supervise(entry.getValue(),
                    new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
              } catch (Exception e) {
                logger.error("Error while starting {}", entry.getValue(), e);
              }
            }
        ​
            this.loadMonitoring();
          }
      8. 之后main函数调用了application.start();

        /**
        其循环Application注册的组件,然后守护哨兵对它进行守护,默认策略是出现问题会自动重启组件,假设我们支持reload配置文件,则之前启动Application时注册过PollingPropertiesFileConfigurationProvider组件,即该组件会被守护哨兵守护着,出现问题默认策略自动重启
        **/
        public synchronized void start() {
          // private final List<LifecycleAware> components;
            for (LifecycleAware component : components) {
              // private final LifecycleSupervisor supervisor;
              supervisor.supervise(component,
                  new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            }
          }

        相应的stop函数。首先是main函数中:

        final Application appReference = application;
        // Runtinme.getRuntime(): Returns the runtime object associated with the current Java application.
        /** 
        addShutdownHook: 注册一个新的虚拟机关闭钩子。
        虚拟机shutdown有两种情况:1)当最后一个非守护进行户外那个退出或调用system.exit时,程序正常退出;2)JVM通过ctrl-c等被用户中断。
        **/
        Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
           @Override
           public void run() {
               appReference.stop();
           }
        });
          public synchronized void stop() {
            // 关闭守护哨兵和监控服务。
            supervisor.stop();
            if (monitorServer != null) {
              monitorServer.stop();
            }
          }
      9. 至此,Application整个流程就分析完了。

    • 整体流程可以总结为:

      1. 首先初始化命令行配置;

      2. 接着读取配置文件;

      3. 根据是否需要reload初始化配置文件中的组件;如果需要reload会使用EventBus进行发布订阅变化;

      4. 接着创建Application,创建守护哨兵LifecycleSupervisor,并先停止所有组件,接着启动所有组件;启动顺序:Channel、SinkRunner、SourceRunner,并把这些组件注册给守护哨兵、初始化监控服务MonitorService;停止顺序:SourceRunner、SinkRunner、Channel;

      5. 如果配置文件需要定期reload,则需要注册PollingPropertiesFileConfigurationProvider到守护哨兵;

      6. 最后注册虚拟机关闭钩子,停止守护哨兵和监控服务。

    LifecycleSupervisor

    • 守护哨兵,负责监控和重启组件

    • My: 所有需要被监控和重启的组件都应implements LifecycleAware

      public class LifecycleSupervisor implements LifecycleAware {
        public LifecycleSupervisor() {
          lifecycleState = LifecycleState.IDLE;
          // 存放被监控的组件
          supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();
          // 存放正在被监控的组件
          monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>();
          // 创建监控服务线程池
          monitorService = new ScheduledThreadPoolExecutor(10,
              new ThreadFactoryBuilder().setNameFormat(
                  "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")
                  .build());
          monitorService.setMaximumPoolSize(20);
          monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);
          // 定期清理被取消的组件
          purger = new Purger();
          // 默认不进行清理
          needToPurge = false;
        }
        ... // start() & stop()...
        
        // 进行组件守护
        public synchronized void supervise(LifecycleAware lifecycleAware,
            SupervisorPolicy policy, LifecycleState desiredState) {
          if (this.monitorService.isShutdown()
              || this.monitorService.isTerminated()
              || this.monitorService.isTerminating()) {
            ...// 如果哨兵已停止则抛出异常
          }
      ​
          // 初始化守护组件
          Supervisoree process = new Supervisoree();
          process.status = new Status();
      ​
          // 默认策略是失败重启
          process.policy = policy;
          process.status.desiredState = desiredState;  // 初始化组件默认状态,一般为START
          process.status.error = false;
      ​
          // 组件监控器,用于定时获取组件的最新状态,或重启组件。后面会介绍MonitorRunnable具体做什么。
          MonitorRunnable monitorRunnable = new MonitorRunnable();
          monitorRunnable.lifecycleAware = lifecycleAware;
          monitorRunnable.supervisoree = process;
          monitorRunnable.monitorService = monitorService;
      ​
          supervisedProcesses.put(lifecycleAware, process);
      ​
          // 以固定时间间隔执行monitorRunnable线程
          // scheduleWithFixedDelay: Creates and executes a periodic action. If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor.
          // 所以需要把所有异常捕获,才能保证定时任务继续执行。
          ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
              monitorRunnable, 0, 3, TimeUnit.SECONDS);
          monitorFutures.put(lifecycleAware, future);
        }
    • MonitorRunnable:负责进行组件状态迁移或组件故障恢复

      public static class MonitorRunnable implements Runnable {
      ​
          public ScheduledExecutorService monitorService;
          public LifecycleAware lifecycleAware;
          public Supervisoree supervisoree;
      ​
          @Override
          public void run() {
            long now = System.currentTimeMillis();
      ​
            try {
              if (supervisoree.status.firstSeen == null) {
                logger.debug("first time seeing {}", lifecycleAware);
      ​
                supervisoree.status.firstSeen = now; // 记录第一次状态查看时间
              }
      ​
              supervisoree.status.lastSeen = now; // 记录最后一次状态查看时间
              synchronized (lifecycleAware) {
                // 如果守护组件被丢弃或出错了,则直接返回
                if (supervisoree.status.discard) {
                  // 也就是此时已经调用了unsupervise
                  logger.info("Component has already been stopped {}", lifecycleAware);
                  return;
                } else if (supervisoree.status.error) {
                  logger.info("Component {} is in error state, and Flume will not"
                      + "attempt to change its state", lifecycleAware);
                  return;
                }
      ​
                // 更新最后一次查看到的状态
                supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
      ​
                // 如果组件的状态和守护组件看到的状态不一致,则以守护组件的状态为准,然后进行初始化
                if (!lifecycleAware.getLifecycleState().equals(
                    supervisoree.status.desiredState)) {
                  switch (supervisoree.status.desiredState) {
                      // 如果是启动状态,则启动组件。# 最开始的时候组件应该就是这么启动的
                    case START:
                      try {
                        lifecycleAware.start();
                      } catch (Throwable e) {
                        logger.error("Unable to start " + lifecycleAware
                            + " - Exception follows.", e);
                        if (e instanceof Error) {
                          // This component can never recover, shut it down.
                          supervisoree.status.desiredState = LifecycleState.STOP;
                          try {
                            lifecycleAware.stop();
                            logger.warn("Component {} stopped, since it could not be"
                                + "successfully started due to missing dependencies",
                                lifecycleAware);
                          } catch (Throwable e1) {
                            logger.error("Unsuccessful attempt to "
                                + "shutdown component: {} due to missing dependencies."
                                + " Please shutdown the agent"
                                + "or disable this component, or the agent will be"
                                + "in an undefined state.", e1);
                            supervisoree.status.error = true;
                            if (e1 instanceof Error) {
                              throw (Error) e1;
                            }
                            // Set the state to stop, so that the conf poller can
                            // proceed.
                          }
                        }
                        supervisoree.status.failures++;
                      }
                      break;
                    case STOP:
                      try {
                        lifecycleAware.stop();
                      } catch (Throwable e) {
                        logger.error("Unable to stop " + lifecycleAware
                            + " - Exception follows.", e);
                        if (e instanceof Error) {
                          throw (Error) e;
                        }
                        supervisoree.status.failures++;
                      }
                      break;
                    default:
                      logger.warn("I refuse to acknowledge {} as a desired state",
                          supervisoree.status.desiredState);
                  }
      ​
                  if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
                    logger.error(
                        "Policy {} of {} has been violated - supervisor should exit!",
                        supervisoree.policy, lifecycleAware);
                  }
                }
              }
            } catch (Throwable t) {
              logger.error("Unexpected error", t);
            }
            logger.debug("Status check complete");
          }
        }

    Source

    SourceRunner

    • 首先是SourceRunner,它控制how a source is driven。

    • 它是一个用来实例化derived classes(派生类)的抽象类。 根据指定的source,来通过其内的static factory method 来实例化runner。

        // 根据指定source的类型来实例化一个source runner的静态工厂方法    
        // 输入是要运行的source,返回可以运行指定source的runner
        public static SourceRunner forSource(Source source) {
          SourceRunner runner = null;
      ​
          if (source instanceof PollableSource) {
            runner = new PollableSourceRunner();
            ((PollableSourceRunner) runner).setSource((PollableSource) source);
          } else if (source instanceof EventDrivenSource) {
            runner = new EventDrivenSourceRunner();
            ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
          } else {
            throw new IllegalArgumentException("No known runner type for source "
                + source);
          }
      ​
          return runner;
        }

    EventDrivenSourceRunner

    • starts、stops and manages EventDrivenSource event-driven sources

    • 其内有如下几个方法:

      • 构造方法

        public EventDrivenSourceRunner() {
            lifecycleState = LifecycleState.IDLE;
          }
      • start()

        @Override
          public void start() {
            Source source = getSource(); //获取Source
            ChannelProcessor cp = source.getChannelProcessor(); //Channel处理器
            cp.initialize(); //初始化Channel处理器
            source.start();  //启动Source
            lifecycleState = LifecycleState.START; //本组件状态改成启动状态
          }
      • stop()、toString()、getLifecycleState()

    PollableSourceRunner

    public class PollableSourceRunner extends SourceRunner {
     @Override
     public void start() {
      PollableSource source = (PollableSource) getSource(); //获取Source
      ChannelProcessor cp = source.getChannelProcessor(); //Channel处理器
      cp.initialize(); //初始化channel处理器
      source.start();  //启动source
    ​
      runner = new PollingRunner();  //新建一个PollingRunner线程来拉取数据
      runner.source = source;
      runner.counterGroup = counterGroup;
      runner.shouldStop = shouldStop;
    ​
      runnerThread = new Thread(runner);
      runnerThread.setName(getClass().getSimpleName() + "-" + 
          source.getClass().getSimpleName() + "-" + source.getName());
      runnerThread.start(); 
    ​
      lifecycleState = LifecycleState.START;
     }
    }
    • PollingRunner线程

    @Override
      public void run() {
        while (!shouldStop.get()) { //如果没有停止,则一直在死循环运行
          counterGroup.incrementAndGet("runner.polls"); //原子操作
    ​
          try {
            //调用PollableSource的process方法进行轮训拉取,然后判断是否遇到了失败补偿
            if (source.process().equals(PollableSource.Status.BACKOFF)) {/
              counterGroup.incrementAndGet("runner.backoffs");
    ​
              //失败补偿时暂停线程处理,等待超时时间之后重试
              Thread.sleep(Math.min(
                  counterGroup.incrementAndGet("runner.backoffs.consecutive")
                  * source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
            } else {
              counterGroup.set("runner.backoffs.consecutive", 0L);
            }
          } catch (InterruptedException e) {
                    }
          }
        }
      }
    }
    • TODO

    Source

    public interface Source extends LifecycleAware, NamedComponent {
      public void setChannelProcessor(ChannelProcessor channelProcessor);
      public ChannelProcessor getChannelProcessor();
    } 
    • 继承了LifecycleAware接口,然后只提供了ChannelProcessor的setter和getter接口。其中:

      • 它的的所有逻辑的实现应该在LifecycleAware接口的start和stop中实现;

      • ChannelProcessor用来进行日志流的过滤和Channel的选择及调度。

    • 由上述的Runner我们知道,Source 提供了两种机制: PollableSource (轮训拉取)和 EventDrivenSource (事件驱动)

    • Source作用就是监听日志,采集,然后交给ChannelProcessor处理。

    EventDrivenSource

    • 事件驱动型source不需要外部driver来获取event,EventDriven是一个implement Source的空接口。

    • 从这里开始~~~‘

      

    Channel

    • 通过 Channel 实现了 Source 和 Sink 的解耦,可以实现多对多的关联,和 Source 、 Sink 的异步化

    • Channel exposes a transaction interface that can be used by its clients to ensure automic put(Event) and take() semantics.

    ChannelProcesoor

    • 前面我们了解到Source采集日志后会交给ChannelProcessor处理,so接下来我们从ChannelProcessor入手,其依赖如下组件:

      private final ChannelSelector selector;  //Channel选择器,.flume.ChannelSelector
      private final InterceptorChain interceptorChain; //拦截器链,.flume.interceptor.InterceptorChain
      private ExecutorService execService; //用于实现可选Channel的ExecutorService,默认是单线程实现 [注:这个我在某个博客上看到的,但这个组件我在ChannelProcessor中没有搜到]
    • 我们来看ChannelProcessor是如何处理Event的:

      // Attempts to put the given event into each configured channel 
      public void processEvent(Event event) {
      ​
          event = interceptorChain.intercept(event); //首先进行拦截器链过滤,TODO:intercep...
        // InterceptorChain实现了Interceptor接口,调用a list of other Interceptors. 实现event的过滤和加工。具体见后面
          if (event == null) {
            return;
          }
      ​
          // Process required channels
          //通过Channel选择器获取必须成功处理的Channel,然后事务中执行.
          List<Channel> requiredChannels = selector.getRequiredChannels(event);
          for (Channel reqChannel : requiredChannels) {
            Transaction tx = reqChannel.getTransaction();  // 继承自Channel接口的类要实现getTransaction()方法,TODO:getTransaction
            Preconditions.checkNotNull(tx, "Transaction object must not be null");
            try {
              tx.begin(); //开始事务
      ​
              reqChannel.put(event);  // 将event放到reqChannel
      ​
              tx.commit();   //提交事务
            } catch (Throwable t) {
              tx.rollback();  // 如果捕捉到throwable(including Error & Exception),则回滚事务
              if (t instanceof Error) {
                LOG.error("Error while writing to required channel: " + reqChannel, t);
                throw (Error) t;
              } else if (t instanceof ChannelException) {
                throw (ChannelException) t;
              } else {
                throw new ChannelException("Unable to put event on required " +
                    "channel: " + reqChannel, t);  //TODO: Channelexception可能会被handle,不然如何保证RequiredChannel的成功处理? 
              }
            } finally {
              if (tx != null) {
                tx.close();  // 最后如果事务非空,还得关闭该事务
              }
            }
          }
      ​
          // Process optional channels
          //通过Channel选择器获取可选的Channel,这些Channel失败是可以忽略,不影响其他Channel的处理
          List<Channel> optionalChannels = selector.getOptionalChannels(event);
          for (Channel optChannel : optionalChannels) {
            Transaction tx = null;
            try {
              tx = optChannel.getTransaction();
              tx.begin();
      ​
              optChannel.put(event);
      ​
              tx.commit();
            } catch (Throwable t) {
              tx.rollback();
              LOG.error("Unable to put event on optional channel: " + optChannel, t);
              if (t instanceof Error) {
                throw (Error) t;
              }
            } finally {
              if (tx != null) {
                tx.close();
              }
            }
          }
        }
    • 看下flume内实现的channel类

    Channel接口

    public interface Channel extends LifecycleAware, NamedComponent {
      // put() and get() must be invoked within an active Transaction boundary
      public void put(Event event) throws ChannelException;
      public Event take() throws ChannelException;
      // @return: the transaction instance associated with this channel
      public Transaction getTransaction();
    }

    AbstractChannel

    • abstract class AbstractChannel implements Channel, LifecycleAware, Configurable

    • 实现了lifecycleStatus的改变(在构造、start()和stop()方法中),实现了空configure()方法。没有做什么具体的channel相关的处理。

    BasicChannelSemantics

    • 基本Channel语义的实现,包括Transaction类的thread-local语义的实现。

    public abstract class BasicChannelSemantics extends AbstractChannel {
    ​
      // 1. 事务使用ThreadLocal存储,保证事务线程安全
      private ThreadLocal<BasicTransactionSemantics> currentTransaction
          = new ThreadLocal<BasicTransactionSemantics>();
    ​
      private boolean initialized = false;
    ​
      protected void initialize() {}   // 2. 进行一些初始化工作
    ​
      // 3.提供给实现类(子类)的创建事务的回调
      // 用于new Transaction对象,该对象必须继承自BasicTransactionSemantics
      // 比如MemoryChannel覆盖了该方法,方法体内new了一个实例,该实例为其内私有类MemoryTransaction,该私有类继承了BasicTransactionSemantics。
      // MemoryTransaction内部用两条双向并发阻塞队列LinkedBlockingDeque实现putList和takeList。具体的稍后看,会介绍到MemoryChannel   TODO
      protected abstract BasicTransactionSemantics createTransaction();
    ​
      // 4. 往Channel中放Event,其直接委托给事务的put方法
      // 确保该thread存在一个事务,然后将put方法委托给该线程的BasicTransactionSemantics实例
      @Override
      public void put(Event event) throws ChannelException {
        // ThreadLocal<BasicTransactionSemantics>的实例currentTransaction
        // 即取得当前线程的事务实例
        BasicTransactionSemantics transaction = currentTransaction.get();
        Preconditions.checkState(transaction != null,
            "No transaction exists for this thread");
        transaction.put(event);
      }
    ​
      // 5.从Channel获取Event,也是直接委托给事务的take方法实现
      @Override
      public Event take() throws ChannelException {
        BasicTransactionSemantics transaction = currentTransaction.get();
        Preconditions.checkState(transaction != null,
            "No transaction exists for this thread");
        return transaction.take();
      }
    ​
      @Override
      public Transaction getTransaction() {
        // 1. 如果channel is not ready, then 初始化该channel
        if (!initialized) {
          synchronized (this) {
            if (!initialized) {
              initialize();
              initialized = true;
            }
          }
        }
    ​
         // 2. 如果当前线程没有open的事务(无事务或已关闭),则创建一个,并绑定到currentTransaction中
        BasicTransactionSemantics transaction = currentTransaction.get();
        if (transaction == null || transaction.getState().equals(
                BasicTransactionSemantics.State.CLOSED)) {
          transaction = createTransaction();
          currentTransaction.set(transaction);
        }
        return transaction;
      }
    }

    MemoryChannel

    • 当写入硬盘不实际或不需要数据持久化时,推荐使用。或在单元测试时使用。

    • 大部分channel会把put和take委托给事务去完成。

    • 纯内存的Channel实现,整个事务操作都是在内存中完成的。

    • 每个事务都有一个TakeList和PutList,分别用于存储事务相关的取数据和放数据,等事务提交时才完全同步到Channel Queue,或者失败把取数据回滚到Channel Queue。 TODO:整体理解何时commit、rollback。

      public class MemoryChannel extends BasicChannelSemantics {
        // TODO: about factory
        private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class);
        ...//一些常量定义:缺省值defaultCapacity、defaultTransCapacity、byteCapacitySlotSize..
      ​
          // 内部类,继承自BasicTransactionSemantics。TODO: About BasicTransactionSemantics
        private class MemoryTransaction extends BasicTransactionSemantics {
          // 每个事务都有两条双向并发阻塞队列,TODO: LinkedBlockingDeque
          private LinkedBlockingDeque<Event> takeList;
          private LinkedBlockingDeque<Event> putList;
          private final ChannelCounter channelCounter; 
          ...//
          public MemoryTransaction(int transCapacity, ChannelCounter counter) {
            putList = new LinkedBlockingDeque<Event>(transCapacity);
            takeList = new LinkedBlockingDeque<Event>(transCapacity);
      ​
            channelCounter = counter;
          }
      ​
          // 将event放到putList中
          // 整个doPut操作相对来说比较简单,就是往事务putList队列放入Event,如果满了则直接抛异常回滚事务;否则放入putList暂存,等事务提交时转移到Channel Queue。另外需要增加放入队列的字节数计数器,以便之后做字节容量限制
          @Override
          protected void doPut(Event event) throws InterruptedException {
            // channelCounter是一个计数器,记录当前队列放入Event数、取出event数、成功数等。
            channelCounter.incrementEventPutAttemptCount(); // 增加放入event计数器
            // estimateEventSize计算当前Event body大小,ceil():向上取整
            int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
      ​
            // 往事务队列的putList中放入Event,如果满了,则抛异常回滚事务
            if (!putList.offer(event)) {
              throw new ChannelException(
                  "Put queue for MemoryTransaction of capacity " +
                  putList.size() + " full, consider committing more frequently, " +
                  "increasing capacity or increasing thread count");
            }
            putByteCounter += eventByteSize; // 增加放入队列字节数计数器
          }
      ​
          // 从Channel Queue中取event放到takeList中
          @Override
          protected Event doTake() throws InterruptedException {
            channelCounter.incrementEventTakeAttemptCount();
            // 如果takeList队列没有剩余容量,即当前事务已经消费了最大容量的Event
            if (takeList.remainingCapacity() == 0) {
              throw new ChannelException("Take list for MemoryTransaction, capacity " +
                  takeList.size() + " full, consider committing more frequently, " +
                  "increasing capacity, or increasing thread count");
            }
            // queueStored试图获取一个信号量,超时直接返回null 
            if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
              return null;
            }
            Event event;
            // 从Channel Queue获取一个Event, 对Channel Queue的操作必须加queueLock
            synchronized (queueLock) {
              event = queue.poll();
            }
            // 因为信号量的保证,Channel Queue不应该返回null,出现了就不正常了
            Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
                "signalling existence of entry");
            // 暂存到事务的takeList队列 
            takeList.put(event);
            // 计算当前Event body大小并增加取出队列字节数计数器
            int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
            takeByteCounter += eventByteSize;
      ​
            return event;
          }
      ​
          // 等事务提交时,才将当前事务的put list同步到Channel Queue
          @Override
          protected void doCommit() throws InterruptedException {
            // /1、计算改变的Event数量,即取出数量-放入数
            int remainingChange = takeList.size() - putList.size();
            if (remainingChange < 0) {
              // bytesRemaining是字节容量信号量,超出容量则回滚事务
              if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
                throw new ChannelException("Cannot commit transaction. Byte capacity " +
                    "allocated to store event body " + byteCapacity * byteCapacitySlotSize +
                    "reached. Please increase heap space/byte capacity allocated to " +
                    "the channel as the sinks may not be keeping up with the sources");
              }
              if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
                bytesRemaining.release(putByteCounter);
                throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
                    " Sinks are likely not keeping up with sources, or the buffer size is too tight");
              }
            }
            int puts = putList.size();
            int takes = takeList.size();
            synchronized (queueLock) {
              if (puts > 0) {
                while (!putList.isEmpty()) {
                  if (!queue.offer(putList.removeFirst())) { // offer:添加一个元素并返回true
                    throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
                  }
                }
              }
              putList.clear();
              takeList.clear();
            }
            bytesRemaining.release(takeByteCounter);
            takeByteCounter = 0;
            putByteCounter = 0;
      ​
            queueStored.release(puts);
            if (remainingChange > 0) {
              queueRemaining.release(remainingChange);
            }
            if (puts > 0) {
              channelCounter.addToEventPutSuccessCount(puts);
            }
            if (takes > 0) {
              channelCounter.addToEventTakeSuccessCount(takes);
            }
      ​
            channelCounter.setChannelSize(queue.size());
          }
      ​
          // 事务失败时,将take list数据回滚到Channel Queue
          // 在回滚时,需要把takeList中暂存的事件回滚到Channel Queue,并回滚queueStored信号量。
          @Override
          protected void doRollback() {
            int takes = takeList.size();
            synchronized (queueLock) {
              Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
                  "Not enough space in memory channel " +
                  "queue to rollback takes. This should never happen, please report");
              while (!takeList.isEmpty()) {
                queue.addFirst(takeList.removeLast());
              }
              putList.clear();
            }
            bytesRemaining.release(putByteCounter);
            putByteCounter = 0;
            takeByteCounter = 0;
      ​
            queueStored.release(takes);
            channelCounter.setChannelSize(queue.size());
          }
      ​
        }
      ​
        private Object queueLock = new Object();
        // 在操作Channel Queue时都需要锁定,因为Channel Queue可能动态扩容(会被重新new)。用法就是synchronized(queueLock){...操作queue}
        @GuardedBy(value = "queueLock")  // 用@GuardedBy注解告诉维护者这个变量被哪个锁保护着
        private LinkedBlockingDeque<Event> queue; // 由一个Channel Queue存储整个Channel的Event数据
      ​
        // Semaphore可控制某资源可被同时访问的个数,acquire()获取一个许可,若无等待,而release()释放一个许可
        // queueRemaining表示可存储事件容量。在提交事务时增加或减少该信号量
        // 1. 首先在configure()函数中初始化为一个capacity大小的信号量
        // 2. 在resize的时候,如果要缩容则要看是否还能acquire到oldCapacity - capacity个许可,不能则不允许缩容(很合理啊,不然就丢失数据了)。若是扩容,则queueRemaining.release(capacity - oldCapacity)
        // 3. 提交事务时,如果takeList.size() < putList.size(),则要检查是否有足够的queueRemaining
        private Semaphore queueRemaining;
      ​
        // 表示ChannelQueue已存储事件容量
        // 2. 在configure()中初始化为一个大小为0的信号量
        // 3. 在doTake()时tryAcquire是否有许可
        // 4. 在commit()时release(puts)增加puts个许可
        // 5. 在rollback()时release(takes)个许可
        private Semaphore queueStored;
      ​
        // maximum items in a transaction queue
        private volatile Integer transCapacity;
        private volatile int keepAlive;
        private volatile int byteCapacity;
        private volatile int lastByteCapacity;
        private volatile int byteCapacityBufferPercentage;
        private Semaphore bytesRemaining;
        private ChannelCounter channelCounter;
      ​
        public MemoryChannel() {
          super();
        }
      ​
        @Override
        public void configure(Context context) {
          // Read parameters from context
          // capacity、transactionCapacity、byteCapacity、byteCapacityBufferPercentage...
        }
      ​
        // 因为多个事务要操作ChannelQueue,还要考虑ChannelQueue的扩容问题,因此MemoryChannel使用了锁来实现;而容量问题则使用了信号量来实现。
        // 改变queue的容量,是通过新建一个LinkedBlockingDeque来实现的,并将原queue的东西加进来。
        private void resizeQueue(int capacity) throws InterruptedException {
          int oldCapacity;
          // 计算原queue的capacity,注意该方法需加锁
          synchronized (queueLock) {
            oldCapacity = queue.size() + queue.remainingCapacity();
          }
      ​
          if (oldCapacity == capacity) {
            return;
          } else if (oldCapacity > capacity) {
            // tryAcquire():从该信号量中获取指定数量的许可
            //首先要预占老容量-新容量的大小,以便缩容容量。如果获取失败,默认是记录日志,然后忽略
            if (!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) {
              LOGGER.warn("Couldn't acquire permits to downsize the queue, resizing has been aborted");
            } else {
              //否则,直接缩容,然后复制老Queue的数据,缩容时需要锁定queueLock,这一系列操作要线程安全
              synchronized (queueLock) {
                LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
                newQueue.addAll(queue);
                queue = newQueue;
              }
            }
          } 

    Interceptor

    • flume内部实现了很多自定义的Interceptor,如下图:

    • 同时还实现了InterceptorChain用来链式处理event。

    InterceptorChain

    • Implementation of Interceptor that calls a list of other Interceptors

    • Interptor接口: 用于过滤、加工Event,然后返回一个新的Event

    • 相比之下,InterceptorChain就是对event逐个(链式)调用其内的Interceptor(接口子类)实例的各个方法。

      public class InterceptorChain implements Interceptor {
      ​
        // list of interceptors that will be traversed, in order
        private List<Interceptor> interceptors;
      ​
        public InterceptorChain() {
          interceptors = Lists.newLinkedList();  // 构造方法,type LinkedList
        }
        public void setInterceptors(List<Interceptor> interceptors) {
          this.interceptors = interceptors;  // set方法
        }
      ​
        // Interceptor接口的intercept方法: Interception of a single Event.事件拦截
        // @return: Original or modified event, or null if the Event is to be dropped.
        @Override
        public Event intercept(Event event) {   
          for (Interceptor interceptor : interceptors) {
            if (event == null) {
              return null;
            }
            event = interceptor.intercept(event);  // 注意:该类的实例会调用上面的set方法初始化intercptors,其中的intercptor是Interceptor接口子类的实例。所以这里的intercept()方法调用的是Interceptor的某个接口所覆盖的方法。[Interceptor有很多子类,下面有一个demo子类的分析,可以往下看HostInterceptor]
          }
          return event;
        }
      ​
        // Interceptor接口: Interception of a batch of events
        // @return: Output list of events
        @Override
        public List<Event> intercept(List<Event> events) {
          ... // 基本同上面的方法,不过调用的是interceptor.intercept(events);
        }
      ​
        // Interceptor: Any initialization / startup needed by the Interceptor.
        @Override
        public void initialize() {
          Iterator<Interceptor> iter = interceptors.iterator();
          while (iter.hasNext()) {
            Interceptor interceptor = iter.next();
            interceptor.initialize();  // 挨个对linkedlist中的interceptor实例进行initialize
          }
        }
      ​
        @Override
        public void close() {
          ...// 挨个对linkedlist中的interceptor实例进行close
      }

    HostInterceptor

    • implements Interceptor

    • 功能:在所有拦截的events的header中上加上本机的host name或IP

      public class HostInterceptor implements Interceptor {
        ... // 一些private变量
        /**
         * Only {@link HostInterceptor.Builder} can build me
         */
        // private的构造方法,so只能通过下面的静态方法Builder实例化
        private HostInterceptor(boolean preserveExisting,
            boolean useIP, String header) {
          // 用xx.conf内的值初始化这些变量
          this.preserveExisting = preserveExisting;
          this.header = header;
          InetAddress addr;
          try {
            addr = InetAddress.getLocalHost(); //Returns the address of the local host.
            if (useIP) {
              //Returns the IP address string in textual presentation
              host = addr.getHostAddress(); 
            } else {
              // Gets the fully qualified domain name for this IP address. 
              host = addr.getCanonicalHostName();
            }
          } catch (UnknownHostException e) {
            logger.warn("Could not get local host address. Exception follows.", e);
          }
      ​
        }
      ​
        @Override
        public void initialize() {
          // no-op
        }
      ​
        /**
         * Modifies events in-place.
         */
        @Override
        public Event intercept(Event event) {
          Map<String, String> headers = event.getHeaders();
      ​
          // 如果要要保存当前的'host‘值并且当前已有头部,那么就不处理直接返回。
          if (preserveExisting && headers.containsKey(header)) {
            return event;
          }
          if (host != null) {
            headers.put(header, host); //将host添加到头部
          }
      ​
          return event;
        }
      ​
        @Override
        public List<Event> intercept(List<Event> events) {
          ... // 为events中的每一个event调用intercept(Event event)
        }
      ​
        @Override
        public void close() {
          // no-op
        }
      ​
        /**
         * Builder which builds new instances of the HostInterceptor.
         */
        public static class Builder implements Interceptor.Builder {
      ​
          private boolean preserveExisting = PRESERVE_DFLT;
          private boolean useIP = USE_IP_DFLT;
          private String header = HOST;
      ​
          @Override
          public Interceptor build() {
            return new HostInterceptor(preserveExisting, useIP, header);
          }
      ​
          @Override
          public void configure(Context context) {
            preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
            useIP = context.getBoolean(USE_IP, USE_IP_DFLT);
            header = context.getString(HOST_HEADER, HOST);
          }
        }
      ​
        public static class Constants {
          public static String HOST = "host";
          ... // 一些配置的缺省值
        }
      }
    • demo Usage in xx.conf: more details see User Guide

      agent.sources.r1.interceptors = i1
      agent.sources.r1.interceptors.i1.type = host
      # preserveExisting: 是否保存当前已存在的'host'值,缺省是不保存
      agent.sources.r1.interceptors.i1.preserveExisting = true
      agent.sources.r1.interceptors.i1.useIP = false
      agent.sources.r1.interceptors.i1.hostHeader = hostname

    Selector

    • 先上一张所有selector的继承关系图

      可见ChannelSelector默认提供了两种实现:复制和多路复用。默认实现是ReplicatingChannelSelector。

    ChannelSelector

    • interface

    • 基于不同实现政策,允许在channels的集合中选取channels子集。

      // NamedComponent接口:用于给component附加一个名字,包括setName()和getName()方法
      public interface ChannelSelector extends NamedComponent, Configurable {
      ​
        // @param channels:all channels the selector could select from.
        public void setChannels(List<Channel> channels);
      ​
        /**
         * Returns a list of required channels. 这些channels的写入失败会传达回接收事件的source.
         * @param: event
         * @return: the list of required channels that this selector has selected for
         * the given event.
         */
        public List<Channel> getRequiredChannels(Event event);
      /**
       * Returns a list of optional channels. 这些channels的写入失败会被忽略。
       * @param: event
       * @return: the list of optional channels that this selector has selected for
       * the given event.
       */
      public List<Channel> getOptionalChannels(Event event);
      ​
      /**
       * @return the list of all channels that this selector is configured to work
       * with.
       */
      public List<Channel> getAllChannels();
      }
      
      ​
      ## AbstractChannelSelector
      ​
      * abstract class
      ​
        ```java
        public abstract class AbstractChannelSelector implements ChannelSelector {
      ​
          private List<Channel> channels;
          private String name;
          
          ...// override ChannelSelctor的getAllChannels()、setChannels(List<Channel> channels)、setName(String name)、getName()方法。
          
          //@return: A map of name to channel instance.
          protected Map<String, Channel> getChannelNameMap() {
            Map<String, Channel> channelNameMap = new HashMap<String, Channel>();
            for (Channel ch : getAllChannels()) {
              // 对每一个Channel, 将Channel和其名字放到HashMap中
              channelNameMap.put(ch.getName(), ch);  
            }
            return channelNameMap;
          }
      ​
          /**
           * Given a list of channel names as space delimited string,
           * returns list of channels.
           * @return List of {@linkplain Channel}s represented by the names.
           */
          // 根据(space分隔的channel名字的)字符串, 返回相应的channel,利用名字-channel的HashMap
          protected List<Channel> getChannelListFromNames(String channels,
                  Map<String, Channel> channelNameMap) {
            List<Channel> configuredChannels = new ArrayList<Channel>();
            if (channels == null || channels.isEmpty()) { // 判空
              return configuredChannels;
            }
            String[] chNames = channels.split(" ");
            for (String name : chNames) {
              Channel ch = channelNameMap.get(name);
              if (ch != null) {
                configuredChannels.add(ch);
              } else {
                throw new FlumeException("Selector channel not found: "
                        + name);
              }
            }
            return configuredChannels;
          }
      ​
        }

    ReplicatingChannelSelector

    • ChannelSelector的一个具体实现,即把接收到的消息复制到每一个Channel。【与之对应的,MultiplexingChannelSelector会根据 Event Header 中的参数进行选择,以此来选择使用哪个 Channel】

    • Replicating channel selector. 允许event被放置到source所配置的所有channels中。

    • 实际的实现方式是,默认将所有channel加入requiredChannels中,optionalChannels为空。然后根据配置的"optional"将该配置对应的channel加入optionalChannels,并从requiredChannels中移除(添加和移除是在configure方法中实现的)。 TODO:看一下这个配置如何实现

      public class ReplicatingChannelSelector extends AbstractChannelSelector {
      ​
        // Configuration to set a subset of the channels as optional.
        public static final String CONFIG_OPTIONAL = "optional";
        List<Channel> requiredChannels = null;  // 在configure()中被设置为getAllChannels()的返回值,即所有配置的channels
        List<Channel> optionalChannels = new ArrayList<Channel>();
      ​
        @Override
        public List<Channel> getRequiredChannels(Event event) {
          /*
           * Seems like there are lot of components within flume that do not call
           * configure method. It is conceiveable that custom component tests too
           * do that. So in that case, revert to old behavior.
           */
          // 如果component没有调用configure(),那么requiredChannels为null,此时再调用一次。
          // TODO: configure()方法是在哪里调用的? 同样的问题在很多class中都存在
          if (requiredChannels == null) {
            return getAllChannels();
          }
          return requiredChannels;
        }
      ​
        @Override
        public List<Channel> getOptionalChannels(Event event) {
          return optionalChannels;
        }
      ​
        @Override
        public void configure(Context context) {
          String optionalList = context.getString(CONFIG_OPTIONAL);
          requiredChannels = new ArrayList<Channel>(getAllChannels());
          Map<String, Channel> channelNameMap = getChannelNameMap();
          // 根据OptionList(String, 是空格分隔的channel名字),得到相应的Channel,并将channel放到optionalChannel&& 从requiredChannels中移除。
          if (optionalList != null && !optionalList.isEmpty()) {
            for (String optional : optionalList.split("\s+")) {
              Channel optionalChannel = channelNameMap.get(optional);
              requiredChannels.remove(optionalChannel);
              if (!optionalChannels.contains(optionalChannel)) {
                optionalChannels.add(optionalChannel);
              }
            }
          }
        }
      }

    Sink

    Sink Runner

    • A driver for sinks that polls them, attempting to process events if any are available in the Channel. All sinks are polled.

      public class SinkRunner implements LifecycleAware {
        private PollingRunner runner; // 内部类,实现了Runnable接口
        private SinkProcessor policy;    // 
      }

    Sink Processor

    • 分为两类:

      • DefaultSinkProcessor处理单sink,直接传送不附加任何处理。

        public void start() {
            Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
            sink.start();    // start()方法直接启动single sink
            lifecycleState = LifecycleState.START;
          }
        // stop()方法类似,configure()方法为空
        public Status process() throws EventDeliveryException {
            return sink.process();   // 直接调用sink的process()
          }
        public void setSinks(List<Sink> sinks) {
            Preconditions.checkNotNull(sinks);
            Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can "
                + "only handle one sink, "
                + "try using a policy that supports multiple sinks");
            sink = sinks.get(0);
          }
      • 多sink处理(AbstractSinkProcessor),其中又包括两种:

        • FailoverSinkProcessor:故障切换—>通过维持一个sinks的优先级list —> 把故障sinks降级放到一个pool中被赋予一个冷冻周期。必须先调用setSinks()再configure()

          public void setSinks(List<Sink> sinks) {
              // needed to implement the start/stop functionality
              super.setSinks(sinks);
          ​
              this.sinks = new HashMap<String, Sink>();
              for (Sink sink : sinks) {
                this.sinks.put(sink.getName(), sink);
              }
            }
          private Sink moveActiveToDeadAndGetNext() {
              Integer key = liveSinks.lastKey();
              failedSinks.add(new FailedSink(key, activeSink, 1)); // 把当前liveSinks里的第一优先级key移除到failedSinks中
              liveSinks.remove(key);
              if (liveSinks.isEmpty()) return null;
              if (liveSinks.lastKey() != null) {
                return liveSinks.get(liveSinks.lastKey());
              } else {
                return null;
              }
            }
          ...
        • LoadBalancingSinkProcessor: 提供在多个sinks之间负载均衡的能力—> 维持一个active sinks的索引序列(load需分布在这些sinks上) —> 算法包括ROUND_ROBIN(default)和RANDOM选择机制。

          内部通过一个private interface SinkSelector实现。该接口下实现了两个私有静态类RoundRobinSinkSelectorRandomOrderSinkSelector.

    满地都是六便士,她却抬头看见了月亮。
  • 相关阅读:
    Sqoop相关
    Hive桶表
    Hive视图
    Hive的Explain命令
    Django路由分发
    Django对应的路由名称
    Django基于正则表达式的URL(2)
    Django基于正则表达式的URL(1)
    Django模板语言循环字典
    Django的CBV和FBV
  • 原文地址:https://www.cnblogs.com/wttttt/p/6873519.html
Copyright © 2011-2022 走看看