zoukankan      html  css  js  c++  java
  • MapReduce将HDFS文本数据导入HBase中

    HBase本身提供了很多种数据导入的方式,通常有两种常用方式:

    1. 使用HBase提供的TableOutputFormat,原理是通过一个Mapreduce作业将数据导入HBase
    2. 另一种方式就是使用HBase原生Client API

    本文就是示范如何通过MapReduce作业从一个文件读取数据并写入到HBase中。

    首先启动Hadoop与HBase,然后创建一个空表,用于后面导入数据:

    hbase(main):006:0> create 'mytable','cf'
    0 row(s) in 10.8310 seconds
    
    => Hbase::Table - mytable
    hbase(main):007:0> list
    TABLE                                                                                                   
    mytable                                                                                                 
    1 row(s) in 0.1220 seconds
    
    => ["mytable"]
    hbase(main):008:0> scan 'mytable'
    ROW                         COLUMN+CELL                                                                 
    0 row(s) in 0.2130 seconds

    一、示例程序

    下面的示例程序通过TableOutputFormat将HDFS上具有一定格式的文本数据导入到HBase中。

    首先创建MapReduce作业,目录结构如下:

    Hdfs2HBase/
    ├── classes
    └── src
        ├── Hdfs2HBase.java
        ├── Hdfs2HBaseMapper.java
        └── Hdfs2HBaseReducer.java

    Hdfs2HBaseMapper.java

    package com.lisong.hdfs2hbase;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class Hdfs2HBaseMapper extends Mapper<LongWritable, Text, Text, Text> {
            public void map(LongWritable key, Text line, Context context) throws IOException,InterruptedException {
                    String lineStr = line.toString();
                    int index = lineStr.indexOf(":");
                    String rowkey = lineStr.substring(0, index);
                    String left = lineStr.substring(index+1);
                    context.write(new Text(rowkey), new Text(left));
            }
    }

    Hdfs2HBaseReducer.java

    package com.lisong.hdfs2hbase;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.util.Bytes;
    
    public class Hdfs2HBaseReducer extends Reducer<Text, Text, ImmutableBytesWritable, Put> {
            public void reduce(Text rowkey, Iterable<Text> value, Context context) throws IOException,InterruptedException {
                    String k = rowkey.toString();
                    for(Text val : value) {
                            Put put = new Put(k.getBytes());
                            String[] strs = val.toString().split(":");
                            String family = strs[0];
                            String qualifier = strs[1];
                            String v = strs[2];
                            put.add(family.getBytes(), qualifier.getBytes(), v.getBytes());
                            context.write(new ImmutableBytesWritable(k.getBytes()), put);
                    }
            }
    }

    Hdfs2HBase.java

    package com.lisong.hdfs2hbase;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class Hdfs2HBase {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if(otherArgs.length != 2) {
                System.err.println("Usage: wordcount <infile> <table>");
                System.exit(2);
            }
    
            Job job = new Job(conf, "hdfs2hbase");
            job.setJarByClass(Hdfs2HBase.class);
            job.setMapperClass(Hdfs2HBaseMapper.class);
            job.setReducerClass(Hdfs2HBaseReducer.class);
    
            job.setOutputKeyClass(ImmutableBytesWritable.class);
            job.setOutputValueClass(Put.class);
    
            job.setOutputFormatClass(TableOutputFormat.class);
    
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, otherArgs[1]);
    
            System.exit(job.waitForCompletion(true)?0:1);
        }
    }

    配置javac编译依赖环境:

    $HADOOP_HOME/share/hadoop/common/hadoop-common-2.4.1.jar
    $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.4.1.jar
    $HADOOP_HOME/share/hadoop/common/lib/commons-cli-1.2.jar

    这里要操作HBase,故除了上面三个jar包,还需要$HBASE_HOME/lib目录下的jar包。为了方便,我们在/etc/profileCLASSPATH里包含所有的依赖包:

    TEMP=`ls /home/hadoop/hbase/lib/*.jar`
    HBASE_JARS=`echo $TEMP | sed 's/ /:/g'`
    export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:/home/hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar:/home/hadoop/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar:/home/hadoop/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar:$HBASE_JARS

    编译

    $ javac -d classes/ src/*.java

    打包

    $ jar -cvf hdfs2hbase.jar classes

    运行

    创建一个data.txt文件,内容如下(列族是建表时创建的列族cf):

    r1:cf:c1:value1 
    r2:cf:c2:value2 
    r3:cf:c3:value3

    将文件复制到hdfs上:

    $ hadoop/bin/hadoop fs -put data.txt /hbase

    运行MapReduce作业:

    $ hadoop/bin/hadoop jar Hdfs2HBase/hdfs2hbase.jar com.lisong.hdfs2hbase.Hdfs2HBase /hbase/data.txt mytable

    报错NoClassDefFoundError找不到类定义:

    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/io/ImmutableBytesWritable
        at com.lisong.hdfs2hbase.Hdfs2HBase.main(Hdfs2HBase.java:30)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        ...
        at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
    Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 7 more

    原因是我没有把HBase的jar包加到hadoop-env.sh中。

    TEMP=`ls /home/hadoop/hbase/lib/*.jar`
    HBASE_JARS=`echo $TEMP | sed 's/ /:/g'`
    HADOOP_CLASSPATH=$HBASE_JARS

    再次运行发现又报了Unable to initialize MapOutputCollector的错误:

    15/08/10 08:55:44 WARN mapred.MapTask: Unable to initialize MapOutputCollector org.apache.hadoop.mapred.MapTask$MapOutputBuffer
    java.lang.NullPointerException
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1008)
        at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:401)
        ...
        at java.lang.Thread.run(Thread.java:745)
    15/08/10 08:55:44 INFO mapred.LocalJobRunner: map task executor complete.
    15/08/10 08:55:44 WARN mapred.LocalJobRunner: job_local2138114942_0001
    java.lang.Exception: java.io.IOException: Unable to initialize any output collector
        at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
        at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
    Caused by: java.io.IOException: Unable to initialize any output collector
        at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:412)
        ...
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    15/08/10 08:55:44 INFO mapreduce.Job: Job job_local2138114942_0001 failed with state FAILED due to: NA
    15/08/10 08:55:45 INFO mapreduce.Job: Counters: 0

    原因是我没有指明Map输出的Key/Value类型,在Hdfs2HBase.java中添加以下两句:

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);     

    如果没有专门定义Mapper输出类型的话,job.setOutputKeyClassjob.setOutputValueClass设置的是Mapper和Reducer两个的输出类型。

    job.setOutputKeyClass(ImmutableBytesWritable.class);
    job.setOutputValueClass(Put.class);

    而Hdfs2HBaseMapper输出类型是Text/Text,所以这里需要单独指定。


    修改Hdfs2HBase.java

    package com.lisong.hdfs2hbase;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class Hdfs2HBase {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if(otherArgs.length != 2) {
                System.err.println("Usage: wordcount <infile> <table>");
                System.exit(2);
            }
    
            Job job = new Job(conf, "hdfs2hbase");
            job.setJarByClass(Hdfs2HBase.class);
            job.setMapperClass(Hdfs2HBaseMapper.class);
            job.setReducerClass(Hdfs2HBaseReducer.class);
    
            job.setMapOutputKeyClass(Text.class);    // +
            job.setMapOutputValueClass(Text.class);  // +
    
            job.setOutputKeyClass(ImmutableBytesWritable.class);
            job.setOutputValueClass(Put.class);
    
            job.setOutputFormatClass(TableOutputFormat.class);
    
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, otherArgs[1]);
    
            System.exit(job.waitForCompletion(true)?0:1);
        }
    }

    再次编译、打包,然后运行成功!

    查询HBase表,验证数据是否已导入:

    hbase(main):001:0> scan 'mytable'
    ROW                         COLUMN+CELL                                                                 
     r1                         column=cf:c1, timestamp=1439223857492, value=value1                         
     r2                         column=cf:c2, timestamp=1439223857492, value=value2                         
     r3                         column=cf:c3, timestamp=1439223857492, value=value3                         
    3 row(s) in 1.3820 seconds

    可以看到,数据导入成功!

    由于需要频繁的与存储数据的RegionServer通信,占用资源较大,一次性入库大量数据时,TableOutputFormat效率并不好。


    二、拓展-TableReducer

    我们可以将Hdfs2HBaseReducer.java代码改成下面这样,作用是一样的:

    package com.lisong.hdfs2hbase;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.util.Bytes;
    
    public class Hdfs2HBaseReducer extends TableReducer<Text, Text, ImmutableBytesWritable> {
        public void reduce(Text rowkey, Iterable<Text> value, Context context) throws IOException,InterruptedException {
            String k = rowkey.toString();
            for(Text val : value) {
                Put put = new Put(k.getBytes());
                String[] strs = val.toString().split(":");
                String family = strs[0];
                String qualifier = strs[1];
                String v = strs[2];
                put.add(family.getBytes(), qualifier.getBytes(), v.getBytes());
                context.write(new ImmutableBytesWritable(k.getBytes()), put);
            }
        }
    }

    这里直接继承了TableReducerTableReducer是部分特例化的Reducer,它只有三个类型参数:输入Key/Value是对应Mapper的输出,输出Key可以是任意的类型,但是输出Value必须是一个PutDelete实例。

    编译打包运行,结果与前面的一样!






    个人站点:http://songlee24.github.com

  • 相关阅读:
    从习总书记讲话学习表达
    Struts——(四)异常处理机制
    Struts框架——(三)动态ActionForm
    Struts框架——(二)Struts原理with登录实例
    Serializable接口和transient关键字
    转发(forward)和重定向(sendRedirect)
    Struts框架——(一)用Servlet + JSP演示Struts基本原理
    Cookie & Session
    Servlet
    SSH框架
  • 原文地址:https://www.cnblogs.com/songlee/p/5738025.html
Copyright © 2011-2022 走看看