zoukankan      html  css  js  c++  java
  • flume 自己定义 hbase sink 类

    參考(向原作者致敬)
    • http://ydt619.blog.51cto.com/316163/1230586
    • https://blogs.apache.org/flume/entry/streaming_data_into_apache_hbase

    flume 1.5 的配置文件演示样例

    #Name the  components on this agent
    a1.sources  = r1
    a1.sinks =  k1
    a1.channels  = c1
    
    #  Describe/configure the source
    a1.sources.r1.type  = spooldir
    a1.sources.r1.spoolDir  = /home/scut/Downloads/testFlume
    
    # Describe  the sink
    a1.sinks.k1.type  = org.apache.flume.sink.hbase.AsyncHBaseSink
    a1.sinks.k1.table = Router #设置hbase的表名
    a1.sinks.k1.columnFamily = log #设置hbase中的columnFamily
    a1.sinks.k1.serializer.payloadColumn=serviceTime,browerOS,clientTime,screenHeight,screenWidth,url,userAgent,mobileDevice,gwId,mac # 设置hbase的column
    a1.sinks.k1.serializer = org.apache.flume.sink.hbase.BaimiAsyncHbaseEventSerializer # 设置serializer的处理类
    
    # Use a  channel which buffers events in memory
    a1.channels.c1.type  = memory
    a1.channels.c1.capacity  = 1000
    a1.channels.c1.transactionCapacity  = 100
    
    # Bind the  source and sink to the channel
    a1.sources.r1.channels  = c1
    a1.sinks.k1.channel  = c1
    重点说明几个属性
    • a1.sinks.k1.serializer.payloadColumn 中列出了全部的列名。
    • a1.sinks.k1.serializer设置了flume serializer的处理类。BaimiAsyncHbaseEventSerializer类中会获取payloadColumn的内容。将它以逗号分隔。从而得出全部的列名。

    BaimiAsyncHbaseEventSerializer类

    /*
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing,
     * software distributed under the License is distributed on an
     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     * KIND, either express or implied.  See the License for the
     * specific language governing permissions and limitations
     * under the License.
     */
    
    package org.apache.flume.sink.hbase;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.FlumeException;
    import org.hbase.async.AtomicIncrementRequest;
    import org.hbase.async.PutRequest;
    import org.apache.flume.conf.ComponentConfiguration;
    import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
    
    import com.google.common.base.Charsets;
    
    public class BaimiAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {
      private byte[] table;
      private byte[] cf;
      private byte[][] payload;
      private byte[][] payloadColumn;
      private final String payloadColumnSplit = "\^A";
      private byte[] incrementColumn;
      private String rowSuffix;
      private String rowSuffixCol;
      private byte[] incrementRow;
      private KeyType keyType;
    
      @Override
      public void initialize(byte[] table, byte[] cf) {
        this.table = table;
        this.cf = cf;
      }
    
      @Override
      public List<PutRequest> getActions() {
        List<PutRequest> actions = new ArrayList<PutRequest>();
        if(payloadColumn != null){
          byte[] rowKey;
          try {
            switch (keyType) {
              case TS:
                rowKey = SimpleRowKeyGenerator.getTimestampKey(rowSuffix);
                break;
              case TSNANO:
                rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowSuffix);
                break;
              case RANDOM:
                rowKey = SimpleRowKeyGenerator.getRandomKey(rowSuffix);
                break;
              default:
                rowKey = SimpleRowKeyGenerator.getUUIDKey(rowSuffix);
                break;
            }
    
    	// for 循环。提交全部列和对于数据的put请求。
    	for (int i = 0; i < this.payload.length; i++)
    	{
            	PutRequest putRequest =  new PutRequest(table, rowKey, cf,payloadColumn[i], payload[i]);
            	actions.add(putRequest);
    	}
    
          } catch (Exception e){
            throw new FlumeException("Could not get row key!", e);
          }
        }
        return actions;
      }
    
      public List<AtomicIncrementRequest> getIncrements(){
        List<AtomicIncrementRequest> actions = new
            ArrayList<AtomicIncrementRequest>();
        if(incrementColumn != null) {
          AtomicIncrementRequest inc = new AtomicIncrementRequest(table,
              incrementRow, cf, incrementColumn);
          actions.add(inc);
        }
        return actions;
      }
    
      @Override
      public void cleanUp() {
        // TODO Auto-generated method stub
    
      }
    
      @Override
      public void configure(Context context) {
        String pCol = context.getString("payloadColumn", "pCol");
        String iCol = context.getString("incrementColumn", "iCol");
        rowSuffixCol = context.getString("rowPrefixCol", "mac");
        String suffix = context.getString("suffix", "uuid");
        if(pCol != null && !pCol.isEmpty()) {
          if(suffix.equals("timestamp")){
            keyType = KeyType.TS;
          } else if (suffix.equals("random")) {
            keyType = KeyType.RANDOM;
          } else if(suffix.equals("nano")){
            keyType = KeyType.TSNANO;
          } else {
            keyType = KeyType.UUID;
          }
     
         	// 从配置文件里读出column。 
         	String[] pCols = pCol.replace(" ", "").split(",");
         	payloadColumn = new byte[pCols.length][];
         	for (int i = 0; i < pCols.length; i++)
    	{
    		// 列名转为小写
    		payloadColumn[i] = pCols[i].toLowerCase().getBytes(Charsets.UTF_8);
    	}
        }
    
        if(iCol != null && !iCol.isEmpty()) {
          incrementColumn = iCol.getBytes(Charsets.UTF_8);
        }
        incrementRow =
            context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
      }
    
      @Override
      public void setEvent(Event event) {
    	String strBody = new String(event.getBody());
    	String[] subBody = strBody.split(this.payloadColumnSplit);
    	if (subBody.length == this.payloadColumn.length)
    	{
    		this.payload = new byte[subBody.length][];
    		for (int i = 0; i < subBody.length; i++)
    		{
    			this.payload[i] = subBody[i].getBytes(Charsets.UTF_8);
    			if ((new String(this.payloadColumn[i]).equals(this.rowSuffixCol)))
    			{
    				// rowkey 前缀是某一列的值, 默认情况是mac地址
    				this.rowSuffix = subBody[i];
    			}
    		}
    	}
      }
    
      @Override
      public void configure(ComponentConfiguration conf) {
        // TODO Auto-generated method stub
      }
    }
    重点能够查看setEent,configure,getActions函数。

    • configure函数:读取flume配置文件内容。包含列名。rowkey后缀等信息
    • setEvent函数:获取flume event 内容,将其保存到payload数组中。
    • getActions函数:创建PutRequest实例。将rowkey,columnfamily,column,value等信息写入putrequest实例中。

    源代码编译和运行

         编写好自己定义的BaimiAsyncHbaseEventSerializer函数后,接下来须要编译源代码,生成flume-ng-hbase-sink.*.jar包,替换flume中原来的flume-ng-hbase-sink.*.jar包。
    • 下载flume 1.5 源代码,解压后进入文件夹flume-1.5.0-src/flume-ng-sinks/flume-ng-hbase-sinks/src/main/java/org/apache/flume/sink/hbase/
    • 复制上面的BaimiAsyncHbaseEventSerializer类到上面的文件夹中。
    • 进入flume-1.5.0-src/flume-ng-sinks/flume-ng-hbase-sinks/。执行mvn编译命令【mvn install -Dmaven.test.skip=true
    • mvn编译后会在flume-1.5.0-src/flume-ng-sinks/flume-ng-hbase-sinks/target文件夹下生成flume-ng-hbase-sink-1.5.0.jar,将这个jar包替换$FLUME_HOME/lib下的jar包
    • 执行flume执行命令【flume-ng agent -c . -f conf/spoolDir.conf -n a1  -Dflume.root.logger=INFO,console



  • 相关阅读:
    jQuery如何获取选中单选按钮radio的值
    java计算出字符串中所有的数字求和?
    java 多线程对List中的数据进行操作
    MongoDB
    CentOS网卡一致性命名
    linux之list_for_each和list_for_each_entry函数
    linux开机启动项
    linux学习参考网站
    linux内核态获取纳秒ns时间
    Linux内核kfifo
  • 原文地址:https://www.cnblogs.com/zfyouxi/p/5196507.html
Copyright © 2011-2022 走看看