zoukankan      html  css  js  c++  java
  • Flume-ng源码解析之Sink组件--转载

    Flume-ng源码解析之Sink组件

     

    如果你还没看过Flume-ng源码解析系列中的启动流程和Channel组件,可以点击下面链接:
    Flume-ng源码解析之启动流程
    Flume-ng源码解析之Channel组件

    作为启动流程中第二个启动的组件,我们今天来看看Sink的细节

    1 Sink

    Sink在agent中扮演的角色是消费者,将event输送到特定的位置

    首先依然是看代码,由代码我们可以看出Sink是一个接口,里面最主要的方法是process(),用来处理从Channel中获取的数据。Sink的实例是由SinkFactory.create()生成的。

    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public interface Sink extends LifecycleAware, NamedComponent {
      public void setChannel(Channel channel);
      public Channel getChannel();
      /* 用来处理channel中取来的event*/
      public Status process() throws EventDeliveryException;
      public static enum Status {
        READY, BACKOFF
      }
    }

    在启动流程中我们了解到Application中启动的不是Sink,而是SinkRunner,由名字我们可以看出这是一个驱动类。我们来看看代码,主要看它的start()

    public class SinkRunner implements LifecycleAware {
    
      ...
    
      @Override
      public void start() {
        SinkProcessor policy = getPolicy();
    
        policy.start();
    
        runner = new PollingRunner();
    
        runner.policy = policy;
        runner.counterGroup = counterGroup;
        runner.shouldStop = new AtomicBoolean();
    
        runnerThread = new Thread(runner);
        runnerThread.setName("SinkRunner-PollingRunner-" +
            policy.getClass().getSimpleName());
        runnerThread.start();
    
        lifecycleState = LifecycleState.START;
      }
      ...
    
    }

    我们知道启动SinkRunner实际上就是调用它的start(),而在start()中可以看到主要是启动了一个SinkProcessor,而这个SinkProcessor在创建SinkRunnner的时候已经指定了,如果你想要了解配置文件是如何处理的,可以要去看看conf包里面的类,可以看看org.apache.flume.node.AbstractConfigurationProvider中的getConfiguration()。

    我们接着看看SinkProcessor

    public interface SinkProcessor extends LifecycleAware, Configurable {
      Status process() throws EventDeliveryException;
      void setSinks(List<Sink> sinks);
    }

    SinkProcesor是一个接口,他的实现类由SinkProcessorFactory的getProcessor()生成,在AbstractConfigurationProvider中的loadSinkGroup()调用SinkGroup中的configure()生成。

    public class SinkGroup implements Configurable, ConfigurableComponent {
      List<Sink> sinks;
      SinkProcessor processor;
      SinkGroupConfiguration conf;
    
      public SinkGroup(List<Sink> groupSinks) {
        sinks = groupSinks;
      }
      
      public SinkProcessor getProcessor() {
        return processor;
      }
    
      @Override
      public void configure(ComponentConfiguration conf) {
        this.conf = (SinkGroupConfiguration) conf;
        processor =
            SinkProcessorFactory.getProcessor(this.conf.getProcessorContext(),
                sinks);
      }
    }
    

    那么我们以DefalutSinkProcessor为例子看看

    public class DefaultSinkProcessor implements SinkProcessor, ConfigurableComponent {
      private Sink sink;
      private LifecycleState lifecycleState;
    
      @Override
      public void start() {
        Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
        sink.start();
        lifecycleState = LifecycleState.START;
      }
    
      @Override
      public void stop() {
        Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
        sink.stop();
        lifecycleState = LifecycleState.STOP;
      }
    
      @Override
      public LifecycleState getLifecycleState() {
        return lifecycleState;
      }
    
      @Override
      public void configure(Context context) {
      }
    
      @Override
      public Status process() throws EventDeliveryException {
        return sink.process();
      }
    
      @Override
      public void setSinks(List<Sink> sinks) {
        Preconditions.checkNotNull(sinks);
        Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can "
            + "only handle one sink, "
            + "try using a policy that supports multiple sinks");
        sink = sinks.get(0);
      }
    
      @Override
      public void configure(ComponentConfiguration conf) {
    
      }
    
    }

    从上面的代码中我们可以看到SinkProcessor执行的还是sink的start、stop和process方法,那么SinkProcessor的作用是什么,Flume提供leFailoverSinkProcessor和LoadBalancingSinkProcessor,顾名思义,一个是失效备援,一个是负载均衡,那么SinkProcessor不同子类的存在就是为了实现不同的分配操作和策略。而sink的start()通常是启动线程去执行消费操作。

  • 相关阅读:
    算法刷题训练(2020.10.6)
    使用Gitee作为图片仓库,博客园引入图片【2020.10.3】
    阿里云DevOps助理工程师认证学习笔记:敏捷项目管理+需求分析【2020.10.2 10.3】
    算法刷题训练(2020.9.30)
    爬取百度贴吧数据(练习Python爬虫)
    算法刷题训练(2020.9.29)
    算法刷题训练(2020.9.28)
    软件测试:软件开发模型和软件测试模型
    计算机组成原理练习题(有助于理解概念)
    wamp环境下Wordpress的安装
  • 原文地址:https://www.cnblogs.com/parent-absent-son/p/11598663.html
Copyright © 2011-2022 走看看