zoukankan      html  css  js  c++  java
  • Flume学习——BasicChannelSemantics

    public class MemoryChannel extends BasicChannelSemantics
    
    public abstract class BasicChannelSemantics extends AbstractChannel
    
    public abstract class AbstractChannel implements Channel, LifecycleAware, Configurable

    AbstractChannel只在实现了LifecycleAware,NamedComponent和Configurable中的一些接口,除了对部分方法进行了同步,没什么特殊。

    BasicChannelSemantics是实现事务机制的关键之一

     * <p>
     * An implementation of basic {@link Channel} semantics, including the
     * implied thread-local semantics of the {@link Transaction} class,
     * which is required to extend {@link BasicTransactionSemantics}.
     * </p>
     */

    BasicChannelSemantics实现了基础的Channel语法,包括了Transaction的thread-local语法。

    首先BasicChannelSemantics类持有一个ThreadLocal对象,它维护了一个BasicTransactionSemantics对象。BasicTransationSementics是一个abstract class,提供了Transaction接口的基础实现。

    这些每个线程都包括了一个唯一的Transaction对象,保证了事务的隔离性。

      private ThreadLocal<BasicTransactionSemantics> currentTransaction
          = new ThreadLocal<BasicTransactionSemantics>();

    在线程中获取当前线程中的事务通过getTransaction方法,它会调用BasicChannelSemantics中定义的的抽象方法createTransaction()来获取BasicTransactionSemantics的实例。

    /**
       * <p>
       * Initializes the channel if it is not already, then checks to see
       * if there is an open transaction for this thread, creating a new
       * one via <code>createTransaction</code> if not.
       * @return the current <code>Transaction</code> object for the
       *     calling thread
       * </p>
       */
      @Override
      public Transaction getTransaction() {
    
        if (!initialized) {
          synchronized (this) {
            if (!initialized) {
              initialize();
              initialized = true;
            }
          }
        }
    
        BasicTransactionSemantics transaction = currentTransaction.get();
        if (transaction == null || transaction.getState().equals(
                BasicTransactionSemantics.State.CLOSED)) {
          transaction = createTransaction();
          currentTransaction.set(transaction);
        }
        return transaction;
      }

    首先,Channel使用了延迟初始化的机制。只有在Channel第一次被调用getTransaction()时,它的initialize()方法才被调用。

    当currentTransaction中并不包含一个BasicTransactionSemantics对象的时候,或者当前的transaction对象已经处理CLOSED状态的时候,它就调用createTransaction方法来获取一个Transaction,并设置给thread-local的currentTransaction。

    至此,Transaction的thread-local机制已实现。

    那么Channel是如何利用thread-local的Transaction对象来实现消息存取的事务呢?它只是确保事务已开启,然后将消息的存取功能代理给本线程的transaction对象。

    
      @Override
      public void put(Event event) throws ChannelException {
        BasicTransactionSemantics transaction = currentTransaction.get();
        Preconditions.checkState(transaction != null,
            "No transaction exists for this thread");
        transaction.put(event);
      }

      @Override
      public Event take() throws ChannelException {
        BasicTransactionSemantics transaction = currentTransaction.get();
        Preconditions.checkState(transaction != null,
            "No transaction exists for this thread");
        return transaction.take();
      }
  • 相关阅读:
    DataPipeline合伙人&CPO陈雷:成为中国的世界级数据中间件厂商
    数字化转型提速,DataPipeline助力中石油产业大数据实时共享能力再提升
    IT168专访|DataPipeline 合伙人&CPO陈雷:我们致力于成为中国的世界级数据中间件厂商
    Odoo 新老版本关于action window动作绑定的两种写法
    关于 错误"ERROR:RELAXNGV:RELAXNG_ERR_EXTRACO NTENT: Element openerp has extra content: data"的处理
    Odoo 前期各版本的协议变化对比
    如何让odoo在执行如PG、Less编译等调用时出现的异常信息打印出来
    Odoo14 官方新版docker镜像跑起来网站页面异常 元素 '<xpath expr="//*[hasclass('o_footer_copyright_name')]">' 在母级视图中没有找到
    一种将历史地图坐标配准到GIS中的方法
    水利地理信息系统
  • 原文地址:https://www.cnblogs.com/devos/p/3440205.html
Copyright © 2011-2022 走看看