zoukankan      html  css  js  c++  java
  • 自定义flume的hbase sink 的序列化程序

    package com.hello.hbase;
    
    import java.nio.charset.Charset;
    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Date;
    import java.util.List;
    import java.util.Locale;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.regex.Pattern;
    
    import org.apache.commons.lang.RandomStringUtils;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.FlumeException;
    import org.apache.flume.conf.ComponentConfiguration;
    import org.apache.flume.sink.hbase.HbaseEventSerializer;
    import org.apache.hadoop.hbase.client.Increment;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Row;
    import com.google.common.base.Charsets;
    import com.google.common.collect.Lists;
    
    
    public class FlumeHbaseEventSerializer implements HbaseEventSerializer {
      
        // Config vars  
        /** Regular expression used to parse groups from event data. */  
        public static final String REGEX_CONFIG = "regex";  
        public static final String REGEX_DEFAULT = " ";  
        /** Whether to ignore case when performing regex matches. */  
        public static final String IGNORE_CASE_CONFIG = "regexIgnoreCase";  
        public static final boolean INGORE_CASE_DEFAULT = false;  
        /** Comma separated list of column names to place matching groups in. */  
        public static final String COL_NAME_CONFIG = "colNames";  
        public static final String COLUMN_NAME_DEFAULT = "ip";  
        /** Index of the row key in matched regex groups */  
        public static final String ROW_KEY_INDEX_CONFIG = "rowKeyIndex";  
        /** Placeholder in colNames for row key */  
        public static final String ROW_KEY_NAME = "ROW_KEY";  
        /** Whether to deposit event headers into corresponding column qualifiers */  
        public static final String DEPOSIT_HEADERS_CONFIG = "depositHeaders";  
        public static final boolean DEPOSIT_HEADERS_DEFAULT = false;  
        /** What charset to use when serializing into HBase's byte arrays */  
        public static final String CHARSET_CONFIG = "charset";  
        public static final String CHARSET_DEFAULT = "UTF-8";  
        /* 
         * This is a nonce used in HBase row-keys, such that the same row-key never 
         * gets written more than once from within this JVM. 
         */  
        protected static final AtomicInteger nonce = new AtomicInteger(0);  
        protected static String randomKey = RandomStringUtils.randomAlphanumeric(10);  
        protected byte[] cf;  
        private byte[] payload;  
        private List<byte[]> colNames = Lists.newArrayList();  
        private boolean regexIgnoreCase;  
        private Charset charset;  
        @Override  
        public void configure(Context context) {  
            String regex = context.getString(REGEX_CONFIG, REGEX_DEFAULT);  
            regexIgnoreCase = context.getBoolean(IGNORE_CASE_CONFIG, INGORE_CASE_DEFAULT);  
            context.getBoolean(DEPOSIT_HEADERS_CONFIG, DEPOSIT_HEADERS_DEFAULT);  
            Pattern.compile(regex, Pattern.DOTALL + (regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0));  
            charset = Charset.forName(context.getString(CHARSET_CONFIG, CHARSET_DEFAULT));  
      
            String cols = new String(context.getString("columns"));  
            String colNameStr;  
            if (cols != null && !"".equals(cols)) {  
                colNameStr = cols;  
            } else {  
                colNameStr = context.getString(COL_NAME_CONFIG, COLUMN_NAME_DEFAULT);  
            }  
      
            String[] columnNames = colNameStr.split(",");  
            for (String s : columnNames) {  
                colNames.add(s.getBytes(charset));  
            }  
        }  
        
        @Override  
        public void configure(ComponentConfiguration conf) {}  
      
        @Override  
        public void initialize(Event event, byte[] columnFamily) {  
            event.getHeaders();  
            this.payload = event.getBody();  
            this.cf = columnFamily;  
        }  
        
        protected byte[] getRowKey(Calendar cal) {  
            String str = new String(payload, charset);  
            String tmp = str.replace(""", "");  
            String[] arr = tmp.split(" ");  
            String log_data = arr[4];
            String[] param_arr = log_data.split("&");
            String userid = param_arr[0];
            String itemid = param_arr[1];
            String type = param_arr[2];
            String ip_str = param_arr[3];
            
    //        String dataStr = arr[3].replace("[", "");  
    //        String rowKey = getDate2Str(dataStr) + "-" + clientIp + "-" + nonce.getAndIncrement();
            String rowKey = ip_str + "-" + nonce.getAndIncrement();
            
            return rowKey.getBytes(charset);  
        }  
      
        protected byte[] getRowKey() {  
            return getRowKey(Calendar.getInstance());  
        }  
    
        @Override  
        public List<Row> getActions() throws FlumeException {  
            List<Row> actions = Lists.newArrayList();  
            byte[] rowKey;  
      
            String body = new String(payload, charset);  
            String tmp = body.replace(""", "");  
    //        String[] arr = tmp.split(REGEX_DEFAULT); 
            String[] arr = tmp.split(" ");
            
            String log_data = arr[4];
            String[] param_arr = log_data.split("&");
            
            String userid = param_arr[0].split("=")[1];
            String itemid = param_arr[1].split("=")[1];
            String type = param_arr[2].split("=")[1];
            String ip_str = param_arr[3].split("=")[1];
                          
            System.out.println("===========");
            System.out.println("===========");
            System.out.println("===========");
            System.out.println("===========");
            System.out.println(userid);
            System.out.println(itemid);
            System.out.println(type);
            System.out.println(ip_str);
            System.out.println("===========");
            System.out.println("===========");
            System.out.println("===========");
            System.out.println("===========");
             
            try {  
                rowKey = getRowKey();
                Put put = new Put(rowKey);  
                put.add(cf, colNames.get(0), userid.getBytes(Charsets.UTF_8));  
                put.add(cf, colNames.get(1), itemid.getBytes(Charsets.UTF_8));  
                put.add(cf, colNames.get(2), type.getBytes(Charsets.UTF_8));
                put.add(cf, colNames.get(3), ip_str.getBytes(Charsets.UTF_8));
                actions.add(put);  
            } catch (Exception e) {  
                throw new FlumeException("Could not get row key!", e);  
            }  
            return actions;  
        }  
      
        @Override  
        public List<Increment> getIncrements() {  
            return Lists.newArrayList();  
        }  
      
        @Override  
        public void close() {}  
      
        public static String getDate2Str(String dataStr) {  
            SimpleDateFormat formatter = null;  
            SimpleDateFormat format = null;  
            Date date = null;  
            try {  
                formatter = new SimpleDateFormat("dd/MMM/yyyy:hh:mm:ss", Locale.ENGLISH);  
                date = formatter.parse(dataStr);  
                format = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");  
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
      
            return format.format(date);  
        }  
    }
  • 相关阅读:
    apply call this arguments caller callee
    JavaScript 小刮号
    asp.net底层http管道
    JavaScript 使用方括号([])引用对象的属性和方法 createDelegate
    MSIL条件跳转(简单注释)
    微软MVP手把手教你如何修改.NET Framework
    简单操作IL文件
    Remoting入门实例
    AutoResetEvent和ManualResetEvent用法示例
    AutoResetEvent和ManualResetEvent用法
  • 原文地址:https://www.cnblogs.com/luozeng/p/9267276.html
Copyright © 2011-2022 走看看