zoukankan      html  css  js  c++  java
  • Flume-NG源码阅读之HBaseSink

      关于HBase的sink的所有内容均在org.apache.flume.sink.hbase包下。

      每个sink包括自己定制的,都extends AbstractSink implements Configurable。

      一、首先是configure(Context context)方法。该方法是对HBaseSink的参数初始化。主要包括以下几个:

      tableName:要写入的HBase数据表名,不能为空;

      columnFamily:数据表对应的列簇名,这个sink目前只支持一个列簇,不能为空;

      batchSize:每次事务可以处理的最大Event数量,默认是100;

      eventSerializerType:用来将event写入HBase,即将event转化为put。默认是org.apache.flume.sink.hbase.SimpleHbaseEventSerializer,还有一个是RegexHbaseEventSerializer,即适合HBaseSink的Serializer只有这俩,否则自己定制;

      serializerContext:是eventSerializerType的配置信息,就是配置文件中包含“serializer.”的项;

      kerberosKeytab和kerberosPrincipal是用来做访问控制的,默认都为空,即不设置。

      并生成eventSerializerType对应的实例并加以配置,两个Serializer各有不同的用途主要是一个只能写一列,一个可以写多列: 

    1 Class<? extends HbaseEventSerializer> clazz =
    2           (Class<? extends HbaseEventSerializer>)
    3           Class.forName(eventSerializerType);
    4       serializer = clazz.newInstance();
    5       serializer.configure(serializerContext);        //配置序列化组件,先配置。默认是SimpleHbaseEventSerializer

      1、SimpleHbaseEventSerializer.configure(Context context):此Serializer只能将数据写入一列

     1   public void configure(Context context) {
     2     rowPrefix = context.getString("rowPrefix", "default");  //获取RowKey的前缀,固定的部分,默认前缀是default
     3     incrementRow =
     4         context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);//获取计数器对应的行键
     5     String suffix = context.getString("suffix", "uuid");  //rowkey的类型(可以指定的有四种uuid/random/timestamp/nano),默认是uuid
     6 
     7     String payloadColumn = context.getString("payloadColumn");  //要写入HBase的列名
     8     String incColumn = context.getString("incrementColumn");  //计数器对应的列
     9     if(payloadColumn != null && !payloadColumn.isEmpty()) {  //根据suffix决定rowkey类型
    10       if(suffix.equals("timestamp")){
    11         keyType = KeyType.TS;
    12       } else if (suffix.equals("random")) {
    13         keyType = KeyType.RANDOM;
    14       } else if(suffix.equals("nano")){
    15         keyType = KeyType.TSNANO;
    16       } else {
    17         keyType = KeyType.UUID;
    18       }
    19       plCol = payloadColumn.getBytes(Charsets.UTF_8);  //列名
    20     }
    21     if(incColumn != null && !incColumn.isEmpty()) {  //存在计数器列
    22       incCol = incColumn.getBytes(Charsets.UTF_8);
    23     }
    24   }

      2、RegexHbaseEventSerializer.configure(Context context):此Serializer根据正则可以写入多列

      public void configure(Context context) {
        String regex = context.getString(REGEX_CONFIG, REGEX_DEFAULT);  //获取配置文件中的正则表达式,默认是“(.*)”
        regexIgnoreCase = context.getBoolean(IGNORE_CASE_CONFIG, 
            INGORE_CASE_DEFAULT);  //是否忽略大小写
        inputPattern = Pattern.compile(regex, Pattern.DOTALL
            + (regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0));  //将给定的正则表达式编译到具有给定标志的模式中
        
        String colNameStr = context.getString(COL_NAME_CONFIG, COLUMN_NAME_DEFAULT);  //获取配置文件中的列名s
        String[] columnNames = colNameStr.split(",");  //分割列名获得列名数组
        for (String s: columnNames) { 
          colNames.add(s.getBytes(Charsets.UTF_8));
        }
      }

      二、start()方法。该方法首先会构造一个HTable对象,并table.setAutoFlush(false)来激活缓冲区(默认大小时2MB),随后的是一些检查。

      三、然后是process()方法用来从channel中take数据,serializer之后写入HBase。

     1 public Status process() throws EventDeliveryException {
     2     Status status = Status.READY;
     3     Channel channel = getChannel();
     4     Transaction txn = channel.getTransaction();
     5     List<Row> actions = new LinkedList<Row>();
     6     List<Increment> incs = new LinkedList<Increment>();
     7     txn.begin();
     8     for(long i = 0; i < batchSize; i++) {
     9       Event event = channel.take();
    10       if(event == null){
    11         status = Status.BACKOFF;
    12         counterGroup.incrementAndGet("channel.underflow");
    13         break;
    14       } else {
    15         serializer.initialize(event, columnFamily);
    16         actions.addAll(serializer.getActions());
    17         incs.addAll(serializer.getIncrements());
    18       }
    19     }
    20     putEventsAndCommit(actions, incs, txn);
    21     return status;
    22   }

      1、actions和incs是要写入HBase的数据,actions对应的是数据;incs对应的是计数器。

      2、serializer.initialize(event, columnFamily),两个Serializer的initialize目的一样:

    1 public void initialize(Event event, byte[] columnFamily) {
    2     this.payload = event.getBody();  //获取要处理的数据
    3     this.cf = columnFamily;    //获取要写入的列簇
    4   }

      3、serializer.getActions()

      SimpleHbaseEventSerializer.getActions()方法会根据configure(Context context)中设置的RowKey类型先获取rowkey,可以是毫秒时间戳、随机数、纳秒时间戳以及UUID128位数四种类型。然后构造一个Put对象,将(列簇,列名,数据)添加进这个Put,返回List<Row> actions。

      RegexHbaseEventSerializer.getActions()方法,首先会做一些判断匹配成功否?匹配出的个数和指定的列数相同否?,然后是获取rowkey,这里的rowkey是[time in millis]-[random key]-[nonce]三部分组成的字符串。剩下的是依次匹配列组成Put,返回List<Row> actions。

      4、serializer.getIncrements()

      SimpleHbaseEventSerializer.getIncrements()如果配置文件中配置了incrementColumn,就添加相应的计数器,否则返回一个没有数据的List<Increment>。

      RegexHbaseEventSerializer.getIncrements()直接返回一个没有数据的List<Increment>,即不设置计数器。

      5、putEventsAndCommit(actions, incs, txn)方法。首先会table.batch(actions)提交List<Put>;然后是计数器table.increment(i);txn.commit()提交事务;如有异常txn.rollback()回滚;txn.close()事务关闭。

      四、stop()方法。table.close();table = null;

      有两个问题撒:

      1、我们在开发HBase程序的时候总是要指定“hbase.zookeeper.quorum”对应的zookeeper地址的,但是看完HBaseSink也没发现设置的地方,是不是在HBase集群中的任意节点都不需要设置,除非在集群外节点才设置?

      2、还有在使用时发现放在安装有zookeeper的节点上运行flume报错,删除zookeeper后运行正常,没安装zookeeper的节点上运行正常,这是为什么??

      希望知道的可以解答哈。。。HBaseSink也比较简单。。。后续还有更多源码解读!敬请期待!!

  • 相关阅读:
    C/C++——二维数组与指针、指针数组、数组指针(行指针)、二级指针的用法
    C/C++——C语言数组名与指针
    C/C++——C语言跳出多重循环方法
    知识储备——国际象棋术语中英文对照
    C/C++——C语言库函数大全
    C/C++——C语言常用库函数
    C/C++——[05] 函数
    C/C++——[04] 语句
    C/C++——[03] 注释
    C/C++——[02] 运算符和表达式
  • 原文地址:https://www.cnblogs.com/lxf20061900/p/3707730.html
Copyright © 2011-2022 走看看