zoukankan      html  css  js  c++  java
  • 数据批量导入HBase

    测试数据:

    datas

    1001    lilei   17  13800001111
    1002    lily    16  13800001112
    1003    lucy    16  13800001113
    1004    meimei  16  13800001114

     数据批量导入使用mr,先生成HFile文件然后在用completebulkload工具导入。

    1、需要先在hbase 创建表名:

    hbase> create 'student', {NAME => 'info'}

    maven pom.xml配置文件如下:

    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.6.0</version>
    </dependency>

    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.0</version>
    </dependency>

    <!-- hbase -->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>1.0.0</version>
            </dependency>
            
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>1.0.0</version>
            </dependency>

    编写MapReduce代码如下:

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /** 
     * @author 作者 E-mail: 
     * @version 创建时间:2016年3月2日 下午4:15:57 
     * 类说明 
     */
    public class CreateHfileByMapReduce {
    
        public static class MyBulkMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>{
            @Override
            protected void setup( Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context )
                throws IOException, InterruptedException {
               
                super.setup( context );
            }
            @Override
            protected void map( LongWritable key, Text value,
                                Context context )
                throws IOException, InterruptedException {
                String[] split = value.toString().split("	"); // 根据实际情况修改
                if (split.length == 4){
                    byte[] rowkey = split[0].getBytes();
                    ImmutableBytesWritable imrowkey = new ImmutableBytesWritable( rowkey );
                    context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1])));
                    context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(split[2])));
                    context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("phone"), Bytes.toBytes(split[3])));
                }
            }
        }
        
        @SuppressWarnings( "deprecation" )
        public static void main( String[] args ) {
            if (args.length != 4){
                System.err.println("Usage: CreateHfileByMapReduce <table_name><data_input_path><hfile_output_path> ");
                System.exit(2);
            }
            
            String tableName = args[0];
            String inputPath  = args[1];
            String outputPath = args[2];
            
          /*  String tableName = "student";
            String inputPath  = "hdfs://node2:9000/datas";
            String outputPath = "hdfs://node2:9000/user/output";*/
            HTable hTable = null;
            Configuration conf = HBaseConfiguration.create();
            try {
               hTable  = new HTable(conf, tableName);
               Job job = Job.getInstance( conf, "CreateHfileByMapReduce");
               job.setJarByClass( CreateHfileByMapReduce.class );
               job.setMapperClass(MyBulkMapper.class);
               job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);
               //
               HFileOutputFormat.configureIncrementalLoad(job, hTable);
               FileInputFormat.addInputPath( job, new Path(inputPath) );
               FileOutputFormat.setOutputPath( job, new Path(outputPath) );
               System.exit( job.waitForCompletion(true)? 0: 1 );
               
            }
            catch ( Exception e ) {
                
                e.printStackTrace();
            }
            
        }
    }

    注: 借助maven的assembly插件, 生成胖jar包(就是把依赖的zookeeper和hbase jar包都打到该MapReduce包中), 否则的话, 就需要用户静态配置, 在Hadoop的class中添加zookeeper和hbase的配置文件和相关jar包.

    最终的jar包为 bulk.jar, 主类名为cn.bd.batch.mr.CreateHfileByMapReduce, 生成HFile, 增量热载入hbase
    sudo -u hdfs hadoop jar <xxoo>.jar <MainClass> <table_name> <data_input_path> <hfile_output_path>
    hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <hfile_output_path> <table_name>

    hadoop jar bulk.jar cn.bd.batch.mr.CreateHfileByMapReduce student /datas /user/output

    hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /user/output student

    本文参考地址:http://www.cnblogs.com/mumuxinfei/p/3823367.html

  • 相关阅读:
    ES5 创建构造函数的私有属性
    js 触发打印操作
    创建 React 项目
    处理因使用 BigInt 等最新语法时 ts 编译报错
    TS 查找第三方声明文件
    Git 撤销工作区中的变动
    Git 查看文件修改状态
    Git 查看用户名和 Email
    查看某个 npm 包的所有发行版版本号,比如 vue
    Git 查看文件修改详情
  • 原文地址:https://www.cnblogs.com/zhanggl/p/5235912.html
Copyright © 2011-2022 走看看