zoukankan      html  css  js  c++  java
  • Flume-ng源码解析之Channel组件

    如果还没看过Flume-ng源码解析之启动流程,可以点击 Flume-ng源码解析之启动流程 查看

    1 接口介绍

    组件的分析顺序是按照上一篇中启动顺序来分析的,首先是Channel,然后是Sink,最后是Source,在开始看组件源码之前我们先来看一下两个重要的接口,一个是LifecycleAware ,另一个是NamedComponent

    1.1 LifecycleAware

    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public interface LifecycleAware {
    
      public void start();
    
      public void stop();
    
      public LifecycleState getLifecycleState();
    
    }
    

    非常简单就是三个方法,start()、stop()和getLifecycleState,这个接口是flume好多类都要实现的接口,包括 Flume-ng源码解析之启动流程
    所中提到PollingPropertiesFileConfigurationProvider(),只要涉及到生命周期的都会实现该接口,当然组件们也是要实现的!

    1.2 NamedComponent

    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public interface NamedComponent {
    
      public void setName(String name);
    
      public String getName();
    
    }
    

    这个没什么好讲的,就是用来设置名字的。

    2 Channel

    作为Flume三大核心组件之一的Channel,我们有必要来看看它的构成:

    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public interface Channel extends LifecycleAware, NamedComponent {
    
      public void put(Event event) throws ChannelException;
      public Event take() throws ChannelException;
      public Transaction getTransaction();
    }
    

    那么从上面的接口中我们可以看到Channel的主要功能就是put()和take(),那么我们就来看一下它的具体实现。这里我们选择MemoryChannel作为例子,但是MemoryChannel太长了,我们就截取一小段来看看

    public class MemoryChannel extends BasicChannelSemantics {
        private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class);
        private static final Integer defaultCapacity = Integer.valueOf(100);
        private static final Integer defaultTransCapacity = Integer.valueOf(100);
        
        public MemoryChannel() {
        }
    
        ...
    }
    

    我们又看到它继承了BasicChannelSemantics ,从名字我们可以看出它是一个基础的Channel,我们继续看看看它的实现

    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public abstract class BasicChannelSemantics extends AbstractChannel {
    
      private ThreadLocal<BasicTransactionSemantics> currentTransaction
          = new ThreadLocal<BasicTransactionSemantics>();
    
      private boolean initialized = false;
    
      protected void initialize() {}
    
      protected abstract BasicTransactionSemantics createTransaction();
    
      @Override
      public void put(Event event) throws ChannelException {
        BasicTransactionSemantics transaction = currentTransaction.get();
        Preconditions.checkState(transaction != null,
            "No transaction exists for this thread");
        transaction.put(event);
      }
    
      @Override
      public Event take() throws ChannelException {
        BasicTransactionSemantics transaction = currentTransaction.get();
        Preconditions.checkState(transaction != null,
            "No transaction exists for this thread");
        return transaction.take();
      }
    
      @Override
      public Transaction getTransaction() {
    
        if (!initialized) {
          synchronized (this) {
            if (!initialized) {
              initialize();
              initialized = true;
            }
          }
        }
    
        BasicTransactionSemantics transaction = currentTransaction.get();
        if (transaction == null || transaction.getState().equals(
                BasicTransactionSemantics.State.CLOSED)) {
          transaction = createTransaction();
          currentTransaction.set(transaction);
        }
        return transaction;
      }
    }
    

    找了许久,终于发现了put()和take(),但是仔细一看,它们内部调用的是BasicTransactionSemantics 的put()和take(),有点失望,继续来看看BasicTransactionSemantics

    public abstract class BasicTransactionSemantics implements Transaction {
    
      private State state;
      private long initialThreadId;
    
      protected void doBegin() throws InterruptedException {}
      protected abstract void doPut(Event event) throws InterruptedException;
      protected abstract Event doTake() throws InterruptedException;
      protected abstract void doCommit() throws InterruptedException;
      protected abstract void doRollback() throws InterruptedException;
      protected void doClose() {}
    
      protected BasicTransactionSemantics() {
        state = State.NEW;
        initialThreadId = Thread.currentThread().getId();
      }
    
      protected void put(Event event) {
        Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
            "put() called from different thread than getTransaction()!");
        Preconditions.checkState(state.equals(State.OPEN),
            "put() called when transaction is %s!", state);
        Preconditions.checkArgument(event != null,
            "put() called with null event!");
    
        try {
          doPut(event);
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          throw new ChannelException(e.toString(), e);
        }
      }
    
      protected Event take() {
        Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
            "take() called from different thread than getTransaction()!");
        Preconditions.checkState(state.equals(State.OPEN),
            "take() called when transaction is %s!", state);
    
        try {
          return doTake();
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          return null;
        }
      }
    
      protected State getState() {
        return state;
      }
    
      ...//我们这里只是讨论put和take,所以一些暂时不涉及的方法就被我干掉,有兴趣恩典朋友可以自行阅读
    
      protected static enum State {
        NEW, OPEN, COMPLETED, CLOSED
      }
    }
    

    又是一个抽象类,put()和take()内部调用的还是抽象方法doPut()和doTake(),看到这里,我相信没有耐心的同学已经崩溃了,但是就差最后一步了,既然是抽象类,那么最终Channel所使用的肯定是它的一个实现类,这时候我们可以回到一开始使用的MemoryChannel,到里面找找有没有线索,一看,MemoryChannel中就藏着个内部类

    private class MemoryTransaction extends BasicTransactionSemantics {
        private LinkedBlockingDeque<Event> takeList;
        private LinkedBlockingDeque<Event> putList;
        private final ChannelCounter channelCounter;
        private int putByteCounter = 0;
        private int takeByteCounter = 0;
    
        public MemoryTransaction(int transCapacity, ChannelCounter counter) {
          putList = new LinkedBlockingDeque<Event>(transCapacity);
          takeList = new LinkedBlockingDeque<Event>(transCapacity);
    
          channelCounter = counter;
        }
    
        @Override
        protected void doPut(Event event) throws InterruptedException {
          channelCounter.incrementEventPutAttemptCount();
          int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
    
          if (!putList.offer(event)) {
            throw new ChannelException(
                "Put queue for MemoryTransaction of capacity " +
                putList.size() + " full, consider committing more frequently, " +
                "increasing capacity or increasing thread count");
          }
          putByteCounter += eventByteSize;
        }
    
        @Override
        protected Event doTake() throws InterruptedException {
          channelCounter.incrementEventTakeAttemptCount();
          if (takeList.remainingCapacity() == 0) {
            throw new ChannelException("Take list for MemoryTransaction, capacity " +
                takeList.size() + " full, consider committing more frequently, " +
                "increasing capacity, or increasing thread count");
          }
          if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
            return null;
          }
          Event event;
          synchronized (queueLock) {
            event = queue.poll();
          }
          Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
              "signalling existence of entry");
          takeList.put(event);
    
          int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
          takeByteCounter += eventByteSize;
    
          return event;
        }
    
       //...依然删除暂时不需要的方法
    
      }
    

    在这个类中我们可以看到doPut()和doTake()的实现方法,也明白MemoryChannel的put()和take()最终调用的是MemoryTransaction 的doPut()和doTake()。

    有朋友看到这里以为这次解析就要结束了,其实好戏还在后头,Channel中还有两个重要的类ChannelProcessor和ChannelSelector,耐心地听我慢慢道来。

    3 ChannelProcessor

    ChannelProcessor 的作用就是执行put操作,将数据放到channel里面。每个ChannelProcessor实例都会配备一个ChannelSelector来决定event要put到那个channl当中

    public class ChannelProcessor implements Configurable {
        private static final Logger LOG = LoggerFactory.getLogger(ChannelProcessor.class);
        private final ChannelSelector selector;
        private final InterceptorChain interceptorChain;
    
        public ChannelProcessor(ChannelSelector selector) {
            this.selector = selector;
            this.interceptorChain = new InterceptorChain();
        }
    
        public void initialize() {
            this.interceptorChain.initialize();
        }
    
        public void close() {
            this.interceptorChain.close();
        }
    
        public void configure(Context context) {
            this.configureInterceptors(context);
        }
    
        private void configureInterceptors(Context context) {
            //配置拦截器
        }
    
        public ChannelSelector getSelector() {
            return this.selector;
        }
    
        public void processEventBatch(List<Event> events) {
            ...
            while(i$.hasNext()) {
                Event optChannel = (Event)i$.next();
                List tx = this.selector.getRequiredChannels(optChannel);
    
    	        ...//将event放到Required队列
    
                t1 = this.selector.getOptionalChannels(optChannel);
    
                Object eventQueue;
                ...//将event放到Optional队列
               
            }
    
    	    ...//event的分配操作
    
        }
    
        public void processEvent(Event event) {
            event = this.interceptorChain.intercept(event);
            if(event != null) {
                List requiredChannels = this.selector.getRequiredChannels(event);
                Iterator optionalChannels = requiredChannels.iterator();
    
                ...//event的分配操作
    
                List optionalChannels1 = this.selector.getOptionalChannels(event);
                Iterator i$1 = optionalChannels1.iterator();
    
                ...//event的分配操作
            }
        }
    }
    

    为了简化代码,我进行了一些删除,只保留需要讲解的部分,说白了Channel中的两个写入方法,都是需要从作为参数传入的selector中获取对应的channel来执行event的put操作。接下来我们来看看ChannelSelector

    4 ChannelSelector

    ChannelSelector是一个接口,我们可以通过ChannelSelectorFactory来创建它的子类,Flume提供了两个实现类MultiplexingChannelSelector和ReplicatingChannelSelector。

    public interface ChannelSelector extends NamedComponent, Configurable {
        void setChannels(List<Channel> var1);
    
        List<Channel> getRequiredChannels(Event var1);
    
        List<Channel> getOptionalChannels(Event var1);
    
        List<Channel> getAllChannels();
    }
    
    

    通过ChannelSelectorFactory 的create来创建,create中调用getSelectorForType来获得一个selector,通过配置文件中的type来创建相应的子类

    public class ChannelSelectorFactory {
    
      private static final Logger LOGGER = LoggerFactory.getLogger(
          ChannelSelectorFactory.class);
    
      public static ChannelSelector create(List<Channel> channels,
          Map<String, String> config) {
    
    	  ...
      }
    
      public static ChannelSelector create(List<Channel> channels,
          ChannelSelectorConfiguration conf) {
        String type = ChannelSelectorType.REPLICATING.toString();
        if (conf != null) {
          type = conf.getType();
        }
        ChannelSelector selector = getSelectorForType(type);
        selector.setChannels(channels);
        Configurables.configure(selector, conf);
        return selector;
      }
    
      private static ChannelSelector getSelectorForType(String type) {
        if (type == null || type.trim().length() == 0) {
          return new ReplicatingChannelSelector();
        }
    
        String selectorClassName = type;
        ChannelSelectorType  selectorType = ChannelSelectorType.OTHER;
    
        try {
          selectorType = ChannelSelectorType.valueOf(type.toUpperCase(Locale.ENGLISH));
        } catch (IllegalArgumentException ex) {
          LOGGER.debug("Selector type {} is a custom type", type);
        }
    
        if (!selectorType.equals(ChannelSelectorType.OTHER)) {
          selectorClassName = selectorType.getChannelSelectorClassName();
        }
    
        ChannelSelector selector = null;
    
        try {
          @SuppressWarnings("unchecked")
          Class<? extends ChannelSelector> selectorClass =
              (Class<? extends ChannelSelector>) Class.forName(selectorClassName);
          selector = selectorClass.newInstance();
        } catch (Exception ex) {
          throw new FlumeException("Unable to load selector type: " + type
              + ", class: " + selectorClassName, ex);
        }
    
        return selector;
      }
    
    }
    

    对于这两种Selector简单说一下:

    1)MultiplexingChannelSelector
    下面是一个channel selector 配置文件

    agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
    agent_foo.sources.avro-AppSrv-source1.selector.header = State
    agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
    agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
    agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
    agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
    agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
    agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
    

    MultiplexingChannelSelector类中定义了三个属性,用于存储不同类型的channel

        private Map<String, List<Channel>> channelMapping;
      	private Map<String, List<Channel>> optionalChannels;
      	private List<Channel> defaultChannels;
    

    那么具体分配原则如下:

    • 如果设置了maping,那么会event肯定会给指定的channel,如果同时设置了optional,也会发送给optionalchannel
    • 如果没有设置maping,设置default,那么event会发送给defaultchannel,如果还同时设置了optional,那么也会发送给optionalchannel
    • 如果maping和default都没指定,如果有指定option,那么会发送给optionalchannel,但是发送给optionalchannel不会进行失败重试

    2)ReplicatingChannelSelector

    分配原则比较简单

    • 如果是replicating的话,那么如果没有指定optional,那么全部channel都有,如果某个channel指定为option的话,那么就要从requiredChannel移除,只发送给optionalchannel

    5 总结:

    作为一个承上启下的组件,Channel的作用就是将source来的数据通过自己流向sink,那么ChannelProcessor就起到将event put到分配好的channel中,而分配的规则是由selector决定的,flume提供的selector有multiplexing和replicating两种。所以ChannelProcessor一般都是在Source中被调用。那么Channel的take()肯定是在Sink中调用的。

  • 相关阅读:
    [USACO5.3]校园网Network of Schools
    [USACO13OPEN]照片Photo

    flask的orm框架----------Flask-SQLAlchemy应用
    Flask-session用法
    flask--自定义auth模块
    flask -falsk_script用法
    Flask-SQLAlchemy数据库
    flask--session源码解析
    flask-源码请求源码
  • 原文地址:https://www.cnblogs.com/simple-focus/p/6523320.html
Copyright © 2011-2022 走看看