zoukankan      html  css  js  c++  java
  • HBase从hdfs导入数据

    需求:将HDFS上的文件中的数据导入到hbase中

    实现上面的需求也有两种办法,一种是自定义mr,一种是使用hbase提供好的import工具

    一、hdfs中的数据是这样的

    每一行的数据是这样的id name age gender birthday

    (my_python_env)[root@hadoop26 ~]# hadoop fs  -cat /t1/*
    1    zhangsan    10    male    NULL
    2    lisi    NULL    NULL    NULL
    3    wangwu    NULL    NULL    NULL
    4    zhaoliu    NULL    NULL    1993

     二、自定义mr

    public class HdfsToHBase {
        public static void main(String[] args) throws Exception{
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "hadoop26:2181");
            conf.set("hbase.rootdir", "hdfs://hadoop26:9000/hbase");
            conf.set(TableOutputFormat.OUTPUT_TABLE, args[1]);
            Job job = Job.getInstance(conf, HdfsToHBase.class.getSimpleName());
            TableMapReduceUtil.addDependencyJars(job);
            job.setJarByClass(HdfsToHBase.class);
            
            job.setMapperClass(HdfsToHBaseMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            
            job.setReducerClass(HdfsToHBaseReducer.class);
            
            FileInputFormat.addInputPath(job, new Path(args[0]));
            job.setOutputFormatClass(TableOutputFormat.class);
            job.waitForCompletion(true);
        }
        
        public static class HdfsToHBaseMapper extends Mapper<LongWritable, Text, Text, Text>{
            private Text outKey = new Text();
            private Text outValue = new Text();
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String[] splits = value.toString().split("	");
                outKey.set(splits[0]);
                outValue.set(splits[1]+"	"+splits[2]+"	"+splits[3]+"	"+splits[4]);
                context.write(outKey, outValue);
            }
        }
        
        public static class HdfsToHBaseReducer extends TableReducer<Text, Text, NullWritable>{
            @Override
            protected void reduce(Text k2, Iterable<Text> v2s, Context context) throws IOException, InterruptedException {
                Put put = new Put(k2.getBytes());
                for (Text v2 : v2s) {
                    String[] splis = v2.toString().split("	");
                    if(splis[0]!=null && !"NULL".equals(splis[0])){
                        put.add("f1".getBytes(), "name".getBytes(),splis[0].getBytes());
                    }
                    if(splis[1]!=null && !"NULL".equals(splis[1])){
                        put.add("f1".getBytes(), "age".getBytes(),splis[1].getBytes());
                    }
                    if(splis[2]!=null && !"NULL".equals(splis[2])){
                        put.add("f1".getBytes(), "gender".getBytes(),splis[2].getBytes());
                    }
                    if(splis[3]!=null && !"NULL".equals(splis[3])){
                        put.add("f1".getBytes(), "birthday".getBytes(),splis[3].getBytes());
                    }
                }
                context.write(NullWritable.get(),put);
            }
        }
    }

    2.1打包运行

    首先在hbase中创建一个表

    hbase(main):006:0> create 'table1','f1'
    0 row(s) in 0.4240 seconds
    
    => Hbase::Table - table1

    然后运行

    hadoop jar HdfsToHBase.jar com.lanyun.hadoop2.HdfsToHBase /t1/part* table1

    最后查看table1中的数据

    hbase(main):014:0* scan 'table1'
    ROW                                              COLUMN+CELL                                                                                                                                 
     1                                               column=f1:age, timestamp=1469069255119, value=10                                                                                            
     1                                               column=f1:gender, timestamp=1469069255119, value=male                                                                                       
     1                                               column=f1:name, timestamp=1469069255119, value=zhangsan                                                                                     
     2                                               column=f1:name, timestamp=1469069255119, value=lisi                                                                                         
     3                                               column=f1:name, timestamp=1469069255119, value=wangwu                                                                                       
     4                                               column=f1:birthday, timestamp=1469069255119, value=1993                                                                                     
     4                                               column=f1:name, timestamp=1469069255119, value=zhaoliu                                                                                      
    4 row(s) in 0.0430 seconds

    三、使用habse提供的import工具

    首先查看其用法

    (my_python_env)[root@hadoop26 ~]# hbase org.apache.hadoop.hbase.mapreduce.Import 
    ERROR: Wrong number of arguments: 0
    Usage: Import [options] <tablename> <inputdir>
    By default Import will load data directly into HBase. To instead generate
    HFiles of data to prepare for a bulk data load, pass the option:
      -Dimport.bulk.output=/path/for/output

    在hbase中创建表table2

    hbase(main):016:0> create 'table2','f1'
    0 row(s) in 0.4080 seconds
    
    => Hbase::Table - table2

    在命令中中使用命令进行导入

    hbase org.apache.hadoop.hbase.mapreduce.Import table2 /t2

    查看table2中的数据

    hbase(main):018:0> scan 'table2'
    ROW                                              COLUMN+CELL                                                                                                                                 
     1                                               column=f1:age, timestamp=1468824267106, value=10                                                                                            
     1                                               column=f1:gender, timestamp=1468824289990, value=male                                                                                       
     1                                               column=f1:name, timestamp=1468824137463, value=zhangsan                                                                                     
     2                                               column=f1:name, timestamp=1468824236014, value=lisi                                                                                         
     3                                               column=f1:name, timestamp=1468824247109, value=wangwu                                                                                       
     4                                               column=f1:birthday, timestamp=1468825870158, value=1993                                                                                     
     4                                               column=f1:name, timestamp=1468825659207, value=zhaoliu                                                                                      
    4 row(s) in 0.0440 seconds

    四、注意

    import工具很方便,但是只能导入Export导出的数据。

  • 相关阅读:
    cocos2dx 2.0 学习笔记简单动画
    几种插入数据的方法
    c#中如何获取本机用户名、MAC地址、IP地址、硬盘ID、CPU序列号、系统名称、物理内存
    C#高效编程改进C#代码的50个行之有效的办法摘抄笔记
    获取两个日期之间的每一天
    取SQL中某个字段的后两位
    该行已经属于另一个表 的解决办法
    一个实体对象不能由多个 IEntityChangeTracker 实例引用
    MVC返回JSON,IE下无法接收JSON,IE下JSON提示另存为
    entity framework DbContext Attach判断
  • 原文地址:https://www.cnblogs.com/dongdone/p/5689370.html
Copyright © 2011-2022 走看看