zoukankan      html  css  js  c++  java
  • 使用bulkload向hbase中批量写入数据

    1、数据样式

    写入之前,需要整理以下数据的格式,之后将数据保存到hdfs中,本例使用的样式如下(用tab分开):

    row1	N
    row2	M
    row3	B
    row4	V
    row5	N
    row6	M
    row7	B

    2、代码

    假设要将以上样式的数据写入到hbase中,列族为cf,列名为colb,可以使用下面的代码(参考)

     1 package com.testdata;
     2 
     3 import java.io.IOException;
     4 import org.apache.hadoop.conf.Configuration;
     5 import org.apache.hadoop.fs.Path;
     6 import org.apache.hadoop.hbase.HBaseConfiguration;
     7 import org.apache.hadoop.hbase.client.HTable;
     8 import org.apache.hadoop.hbase.client.Put;
     9 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    10 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
    11 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
    12 import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
    13 import org.apache.hadoop.hbase.util.Bytes;
    14 import org.apache.hadoop.io.Text;
    15 import org.apache.hadoop.mapreduce.Job;
    16 import org.apache.hadoop.mapreduce.Mapper;
    17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    19 
    20 public class TestBulkLoad {
    21     
    22     public static class LoadMapper extends Mapper<Object,Text,ImmutableBytesWritable,Put>{
    23         
    24         @Override
    25         protected void map(Object key, Text value, Context context)
    26                 throws IOException, InterruptedException {
    27             String[] values = value.toString().split("	");
    28             if(values.length ==2 ){
    29                 byte[] rowkey = Bytes.toBytes(values[0]);
    30                 byte[] col_value = Bytes.toBytes(values[1]);
    31                 byte[] familly = Bytes.toBytes("cf");
    32                 byte[] column = Bytes.toBytes("colb");
    33                 ImmutableBytesWritable rowkeyWritable = new ImmutableBytesWritable(rowkey);
    34                 Put testput = new Put(rowkey);
    35                 testput.add(familly,column,col_value);
    36                 context.write(rowkeyWritable, testput);    
    37             }        
    38             
    39         }
    40     }
    41     public static void main(String[] args) throws Exception {
    42         if(args.length !=4 ){
    43             System.exit(0);
    44         }
    45         
    46         String in = args[0];
    47         String out = args[1];
    48         int unitmb =Integer.valueOf(args[2]);                
    49         String tbname = args[3];
    50         
    51         Configuration conf = new Configuration();                
    52         conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(unitmb * 1024 * 1024));
    53         conf.set("mapred.min.split.size", String.valueOf(unitmb * 1024 * 1024));
    54         conf.set("mapreduce.input.fileinputformat.split.minsize.per.node", String.valueOf(unitmb * 1024 * 1024));
    55         conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack", String.valueOf(unitmb * 1024 * 1024));
    56                 
    57         Job job = new Job(conf);        
    58         FileInputFormat.addInputPath(job, new Path(in));
    59         FileOutputFormat.setOutputPath(job, new Path(out));            
    60         job.setMapperClass(LoadMapper.class); 
    61         job.setReducerClass(PutSortReducer.class);     
    62         job.setOutputFormatClass(HFileOutputFormat2.class);
    63         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    64         job.setMapOutputValueClass(Put.class);        
    65         job.setJarByClass(TestBulkLoad.class);
    66         
    67         Configuration hbaseconf = HBaseConfiguration.create();
    68         HTable table = new HTable(hbaseconf,tbname);
    69         HFileOutputFormat2.configureIncrementalLoad(job, table);     
    70         
    71         job.waitForCompletion(true);   
    72         LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseconf);
    73         loader.doBulkLoad(new Path(out), table);
    74 
    75     }
    76 
    77 }

    这段代码使用mapreduce程序对数据做了进一步处理,之后调用相关的api将数据写入hbase中。PutSortReducer是一个自带的reducer类,不需要再进行编写。

    3、执行

    数据保存在TEXT文件中,上面代码导出的jar包为bulkload,hbase的数据表名称为testdata,注意,先指定以下HADOOP_CLASSPATH,避免出错。

    1 export HADOOP_CLASSPATH=$HBASE_HOME/lib/*:$HADOOP_CLASSPATH
    2 hadoop jar ./Downloads/bulkload.jar com.testdata.TestBulkLoad Test hbasedata 64 testdata

    4、结果

     ,

  • 相关阅读:
    shell中的for循环
    tty相关
    udev规则
    find中perm参数
    日志级别的选择:Debug、Info、Warn、Error还是Fatal
    云计算、云存储、大数据
    JSON数据解析(python3.4)
    38、友盟统计
    37、iamgeview 图层叠加
    36、imageview的坑
  • 原文地址:https://www.cnblogs.com/learn21cn/p/6158523.html
Copyright © 2011-2022 走看看