zoukankan      html  css  js  c++  java
  • Flume-NG中的Channel与Transaction关系(原创)

      在sink和source中(不管是内置还是自定义的),基本都有如下代码,这些代码在sink中的process方法中,而在source中自己不需要去写,在source中getChannelProcessor().processEventBatch(events)方法中会自动创建下面类似的:  

        ...
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Event event = null;
        Status result = Status.READY;
        transaction.begin();
        ...
        event = channel.take();//getChannelProcessor().processEvent(event);,前者用于sink后者用于source
        ...
        transaction.commit();
        transaction.rollback()
        transaction.close();
        ...

      那么有些人就要问了?从上述代码中似乎只需要获取channel就可以了,因为获取数据时只需要event = channel.take()或者

    getChannelProcessor().processEvent(event)?这样对吗?你可以去掉transaction试试,结果显示是不行的,出错!

      那么为什么呢?这确实有点让人疑惑,但实际上channel.take()操作是transaction.doTake()。也就是实际的put和take等操作都是在transaction中进行的,因此要用channel必须要先创建transcation才可以使用。而channel.getTransaction()方法就是获取(已经创建)或创建(还没有)transcation,BasicChannelSemantics的相对应代码如下:  

    @Override
      public Transaction getTransaction() {
    
        if (!initialized) {
          synchronized (this) {
            if (!initialized) {
              initialize();
              initialized = true;
            }
          }
        }
    
        BasicTransactionSemantics transaction = currentTransaction.get();//获取transcation
        if (transaction == null || transaction.getState().equals(//如果transaction不存在或者已关闭就创建
                BasicTransactionSemantics.State.CLOSED)) {
          transaction = createTransaction();//创建
          currentTransaction.set(transaction);//赋值给currentTransaction
        }
        return transaction;
      }

      该方法在所有channel的父类BasicChannelSemantics中,然后在具体实现的channel类中需要实现protected abstract BasicTransactionSemantics createTransaction()这个抽象方法来获取相应的transaction对象。BasicChannelSemantics把transaction.take()和transaction.put(event)方法进一步封装成take()和put(event)方法,这俩方法就是暴露在sink或者source中的channel.take()和channel.put(event)方法。

     @Override
      public void put(Event event) throws ChannelException {
        BasicTransactionSemantics transaction = currentTransaction.get();
        Preconditions.checkState(transaction != null,
            "No transaction exists for this thread");
        transaction.put(event);
      }
    
      @Override
      public Event take() throws ChannelException {
        BasicTransactionSemantics transaction = currentTransaction.get();
        Preconditions.checkState(transaction != null,
            "No transaction exists for this thread");
        return transaction.take();
      }

      由此,可以看出工作行程了吧!

      Transaction transaction = channel.getTransaction();这一句至少要执行一次,因为执行一次之后就会将transcation对象缓存到currentTransaction中,后续就不会再创建transaction了。
  • 相关阅读:
    成为 Linux 内核高手的四个方法
    专访CEO何朝曦:深信服高速成长的秘诀
    世界上最让人痛苦的是无所事事(年纪轻轻当项目经理、研发总监真的不好)
    2015 8月之后"云计算"学习计划
    程辉:创造云计算的第四种商业模式(送源码的托管云)
    配置restful webservice 框架restkit
    跟我一起玩转Sencha Touch 移动 WebApp 开发1
    数据更新最佳实践
    addEventListener和attachEvent以及element.onclick的区别
    java线程池:ThreadPoolExecutor
  • 原文地址:https://www.cnblogs.com/lxf20061900/p/3679378.html
Copyright © 2011-2022 走看看