zoukankan      html  css  js  c++  java
  • HBase Bulk Loading

       将数据导入到HBase有三种方式:(1) Mapreduce,输出为TableOutputFormat.(2) 用HBase API .(3)Bulk Loading。对于大量的数据入库,第三种数据是最为有效的。

      下图描述了Bulk Loading的过程:先将数据(MySQL ,Oracle ,文本文件等)加载到HDFS,通过MapReduce 将数据做成HFile (HFileOutPutForm)。然后使用HBase提供的CompleteBulkLoad(LoadIncrementalHFiles)工具加载到HBase中,这个过程很快,而且不很耗内存,不影响在线的Hbase 集群的正常操作。因为这个过程不需要结果WAL 和Memstore.

     

    注意事项:

    (1)配置一个total order partitioner。

    (2)reduce 个数要和表的region 数目匹配。

    (3)MR 输出的Key/Value 类型要和HFileOutPutFormat的匹配。

    (4)reduce 采用KeyValueSortReducer 或者PutSortReducer。

    应用场景:

    (1)集群上线,原始数据集加载。

    (2)数据增量。需要定期将MySql(Oracle) 的数据导入HBase。

    (3)经常性的大批量入库。

    对于CSV文件的加载:


    hadoop jar   /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-
    security.jar importtsv
    -Dimporttsv.separator=,
    -Dimporttsv.bulk.output=output
    -Dimporttsv.columns=HBASE_ROW_KEY,f:count wordcount word_count.csv

      该文件的数据格式为---> rowkey,列:值 。

      导入到的表名为wordcount ,数据文件为word_count.csv

      这样做,不会生成wordcount表。

     执行

    hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output wordcount 
     入库完成。


    hadoop jar   /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-
    security.jar importtsv
    -Dimporttsv.separator=,
    -Dimporttsv.columns=HBASE_ROW_KEY,f:count wordcount word_count.csv
     这样做一步到位,直接入库。

     或者用

    HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-VERSION.jar completebulkload <hdfs://storefileoutput> <tablename>
    同样 一步到位,直接入库。


    下面是一个MR生成HFile的例子:


    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
    import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    /**
    * HBase bulk import example<br>
    * Data preparation MapReduce job driver
    * <ol>
    * <li>args[0]: HDFS input path
    * <li>args[1]: HDFS output path
    * <li>args[2]: HBase table name
    * </ol>
    */
    public class Driver {
      public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        args = new GenericOptionsParser(conf, args).getRemainingArgs();
        /*
    * NBA Final 2010 game 1 tip-off time (seconds from epoch)
    * Thu, 03 Jun 2010 18:00:00 PDT
    */
        conf.setInt("epoch.seconds.tipoff", 1275613200);
        conf.set("hbase.table.name", args[2]);
        
        // Load hbase-site.xml
        HBaseConfiguration.addHbaseResources(conf);
        Job job = new Job(conf, "HBase Bulk Import Example");
        job.setJarByClass(HBaseKVMapper.class);
        job.setMapperClass(HBaseKVMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(KeyValue.class);
        job.setInputFormatClass(TextInputFormat.class);
        HTable hTable = new HTable(args[2]);
        
        // Auto configure partitioner and reducer
        HFileOutputFormat.configureIncrementalLoad(job, hTable);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
      }
    }
    import java.io.IOException;
    import java.util.Locale;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.joda.time.DateTime;
    import org.joda.time.DateTimeZone;
    import org.joda.time.format.DateTimeFormat;
    import org.joda.time.format.DateTimeFormatter;
    import au.com.bytecode.opencsv.CSVParser;
    /**
    * HBase bulk import example
    * <p>
    * Parses Facebook and Twitter messages from CSV files and outputs
    * <ImmutableBytesWritable, KeyValue>.
    * <p>
    * The ImmutableBytesWritable key is used by the TotalOrderPartitioner to map it
    * into the correct HBase table region.
    * <p>
    * The KeyValue value holds the HBase mutation information (column family,
    * column, and value)
    */
    public class HBaseKVMapper extends
        Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
      final static byte[] SRV_COL_FAM = "srv".getBytes();
      final static int NUM_FIELDS = 16;
      CSVParser csvParser = new CSVParser();
      int tipOffSeconds = 0;
      String tableName = "";
      DateTimeFormatter p = DateTimeFormat.forPattern("MMM dd, yyyy HH:mm:ss")
          .withLocale(Locale.US).withZone(DateTimeZone.forID("PST8PDT"));
      ImmutableBytesWritable hKey = new ImmutableBytesWritable();
      KeyValue kv;
      /** {@inheritDoc} */
      @Override
      protected void setup(Context context) throws IOException,
          InterruptedException {
        Configuration c = context.getConfiguration();
        tipOffSeconds = c.getInt("epoch.seconds.tipoff", 0);
        tableName = c.get("hbase.table.name");
      }
      /** {@inheritDoc} */
      @Override
      protected void map(LongWritable key, Text value, Context context)
          throws IOException, InterruptedException {
        if (value.find("Service,Term,") > -1) {
          // Skip header
          return;
        }
        String[] fields = null;
        try {
          fields = csvParser.parseLine(value.toString());
        } catch (Exception ex) {
          context.getCounter("HBaseKVMapper", "PARSE_ERRORS").increment(1);
          return;
        }
        if (fields.length != NUM_FIELDS) {
          context.getCounter("HBaseKVMapper", "INVALID_FIELD_LEN").increment(1);
          return;
        }
        // Get game offset in seconds from tip-off
        DateTime dt = null;
        try {
          dt = p.parseDateTime(fields[9]);
        } catch (Exception ex) {
          context.getCounter("HBaseKVMapper", "INVALID_DATE").increment(1);
          return;
        }
        int gameOffset = (int) ((dt.getMillis() / 1000) - tipOffSeconds);
        String offsetForKey = String.format("%04d", gameOffset);
        String username = fields[2];
        if (username.equals("")) {
          username = fields[3];
        }
        // Key: e.g. "1200:twitter:jrkinley"
        hKey.set(String.format("%s:%s:%s", offsetForKey, fields[0], username)
            .getBytes());
        // Service columns
        if (!fields[0].equals("")) {
          kv = new KeyValue(hKey.get(), SRV_COL_FAM,
              HColumnEnum.SRV_COL_SERVICE.getColumnName(), fields[0].getBytes());
          context.write(hKey, kv);
        }
        if (!fields[1].equals("")) {
          kv = new KeyValue(hKey.get(), SRV_COL_FAM,
              HColumnEnum.SRV_COL_TERM.getColumnName(), fields[1].getBytes());
          context.write(hKey, kv);
        }
        if (!fields[2].equals("")) {
          kv = new KeyValue(hKey.get(), SRV_COL_FAM,
              HColumnEnum.SRV_COL_USERNAME.getColumnName(), fields[2].getBytes());
          context.write(hKey, kv);
        }
        if (!fields[3].equals("")) {
          kv = new KeyValue(hKey.get(), SRV_COL_FAM,
              HColumnEnum.SRV_COL_NAME.getColumnName(), fields[3].getBytes());
          context.write(hKey, kv);
        }
        if (!fields[4].equals("")) {
          kv = new KeyValue(hKey.get(), SRV_COL_FAM,
              HColumnEnum.SRV_COL_UPDATE.getColumnName(), fields[4].getBytes());
          context.write(hKey, kv);
        }
        if (!fields[9].equals("")) {
          kv = new KeyValue(hKey.get(), SRV_COL_FAM,
              HColumnEnum.SRV_COL_TIME.getColumnName(), fields[9].getBytes());
          context.write(hKey, kv);
        }
        context.getCounter("HBaseKVMapper", "NUM_MSGS").increment(1);
        /*
    * Output number of messages per quarter and before/after game. This should
    * correspond to the number of messages per region in HBase
    */
        if (gameOffset < 0) {
          context.getCounter("QStats", "BEFORE_GAME").increment(1);
        } else if (gameOffset < 900) {
          context.getCounter("QStats", "Q1").increment(1);
        } else if (gameOffset < 1800) {
          context.getCounter("QStats", "Q2").increment(1);
        } else if (gameOffset < 2700) {
          context.getCounter("QStats", "Q3").increment(1);
        } else if (gameOffset < 3600) {
          context.getCounter("QStats", "Q4").increment(1);
        } else {
          context.getCounter("QStats", "AFTER_GAME").increment(1);
        }
      }
    }



    /**
    * HBase table columns for the 'srv' column family
    */
    public enum HColumnEnum {
      SRV_COL_SERVICE ("service".getBytes()),
      SRV_COL_TERM ("term".getBytes()),
      SRV_COL_USERNAME ("username".getBytes()),
      SRV_COL_NAME ("name".getBytes()),
      SRV_COL_UPDATE ("update".getBytes()),
      SRV_COL_TIME ("pdt".getBytes());
     
      private final byte[] columnName;
      
      HColumnEnum (byte[] column) {
        this.columnName = column;
      }
      public byte[] getColumnName() {
        return this.columnName;
      }
    }











  • 相关阅读:
    android常用工具类
    SharedPreferences的工具类
    Dialog对话框管理工具类
    Logger日志管理工具类
    android 复制、粘贴文字
    sd卡文件操作
    AndroidManifest.xml file missing 解决方案
    Jar mismatch! Fix your dependencies
    时间戳和字符串之间的互相转换
    常见块元素和内联元素
  • 原文地址:https://www.cnblogs.com/cl1024cl/p/6205115.html
Copyright © 2011-2022 走看看