zoukankan      html  css  js  c++  java
  • mapreduce中实现对hbase中表数据的添加

    mapreduce中实现对hbase中表数据的添加

     

      参考网址:http://www.javabloger.com/article/hadoop-mapreduce-hbase.html

           根据参考网址中的小实例,自己亲自实现了一下,记录一下自己对该程序的一些理解。

           实例:先将数据文件上传到HDFS,然后用MapReduce进行处理,将处理后的数据插入到hbase中。代码如下:

           首先是Mapper:

    复制代码
    package txt_to_hbase;

    import java.io.IOException;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    public class THMapper extends Mapper<LongWritable,Text,Text,Text>{
    public void map(LongWritable key,Text value,Context context){
    String[] items = value.toString().split(" ");
    String k = items[0];
    String v = items[1];
    System.out.println("key:"+k+","+"value:"+v);
    try {

    context.write(new Text(k), new Text(v));

    } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }

    }

    }
    复制代码

      然后是Reduce:

    复制代码
    package txt_to_hbase;

    import java.io.IOException;

    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.Text;

    public class THReducer extends TableReducer<Text,Text,ImmutableBytesWritable>{
    public void reduce(Text key,Iterable<Text> value,Context context){
    String k = key.toString();
    String v = value.iterator().next().toString(); //由数据知道value就只有一行
    Put putrow = new Put(k.getBytes());
    putrow.add("f1".getBytes(), "qualifier".getBytes(), v.getBytes());
    try {

    context.write(new ImmutableBytesWritable(key.getBytes()), putrow);

    } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }

    }

    }
    复制代码

      然后是Driver:

    复制代码
    package txt_to_hbase;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.util.Tool;

    public class THDriver extends Configured implements Tool{

    @Override
    public int run(String[] arg0) throws Exception {
    // TODO Auto-generated method stub
    Configuration conf = HBaseConfiguration.create();
    conf.set("hbase.zookeeper.quorum.", "localhost"); //千万别忘记配置

    Job job = new Job(conf,"Txt-to-Hbase");
    job.setJarByClass(TxtHbase.class);

    Path in = new Path("/home/daisy/inout/txthbase/");

    job.setInputFormatClass(TextInputFormat.class);
    FileInputFormat.addInputPath(job, in);

    job.setMapperClass(THMapper.class);
    job.setReducerClass(THReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);

    TableMapReduceUtil.initTableReducerJob("tab1", THReducer.class, job);

    job.waitForCompletion(true);
    return 0;
    }

    }
    复制代码

      最后是主类:

    复制代码
    package txt_to_hbase;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.util.ToolRunner;

    public class TxtHbase {
    public static void main(String [] args) throws Exception{
    int mr;
    mr = ToolRunner.run(new Configuration(),new THDriver(),args);
    System.exit(mr);
    }
    }
    复制代码


      输入文件是3个txt文件,每个txt中的文件内容均是如下格式:

    1 name1--txt1-www.javabloger.com

    2 name2--txt1

    3 name3--txt1

    4 name4--txt1

    5 name5--txt1

      通过以上代码,mapreduce实现之后,在hbase的shell中查看tab1表,如下:

    复制代码
    hbase(main):009:0> scan 'tab1'
    ROW COLUMN+CELL
    1 column=f1:qualifier, timestamp=1320235555118, value=name1--txt1-www.javabloger.com
    10 column=f1:qualifier, timestamp=1320235555118, value=name10--txt2
    11 column=f1:qualifier, timestamp=1320235555118, value=name11--txt3-www.javabloger.com
    12 column=f1:qualifier, timestamp=1320235555118, value=name12--txt3
    13 column=f1:qualifier, timestamp=1320235555118, value=name13--txt3
    14 column=f1:qualifier, timestamp=1320235555118, value=name14--txt3
    15 column=f1:qualifier, timestamp=1320235555118, value=name15--txt3
    2 column=f1:qualifier, timestamp=1320235555118, value=name2--txt1
    3 column=f1:qualifier, timestamp=1320235555118, value=name3--txt1
    4 column=f1:qualifier, timestamp=1320235555118, value=name4--txt1
    5 column=f1:qualifier, timestamp=1320235555118, value=name5--txt1
    6 column=f1:qualifier, timestamp=1320235555118, value=name6--txt2-www.javabloger.com
    7 column=f1:qualifier, timestamp=1320235555118, value=name7--txt2
    8 column=f1:qualifier, timestamp=1320235555118, value=name8--txt2
    9 column=f1:qualifier, timestamp=1320235555118, value=name9--txt2
    15 row(s) in 0.0570 seconds
    复制代码

      Map跟普通的mapreduce函数没有多大区别,正常的TextInputFormat方式输入,按行读取。

           Reduce中要把处理之后的结果写入hbase的表中,所以与普通的mapreduce程序有些区别,由以上代码可以知道,reduce类继承的是TableReducer,通过查询API(如下图1)知道,它也是一种基本的Reducer类,与其他的reduce类一样,它的输入k/v对是对应Map的输出k/v对,它的输出key可以是任意的类型,但是value必须是一个put或delete实例。

                                                                            图1:TableReducer类详解  

      Reduce的输出key是ImmutableWritable类型(org.apache.hadoop.hase.io),API中的解释,它是一个可以用作key或value类型的字节序列,该类型基于BytesWritable,不能调整大小。Reduce的输出value是一个put。如上面代码:   context.write(new ImmutableBytesWritable(key.getBytes()), putrow);

           Driver中job配置的时候没有设置 job.setReduceClass(); 而是用 TableMapReduceUtil.initTableReducerJob("tab1", THReducer.class, job); 来执行reduce类。

           TableMapReduceUtil类(org.apache.hadoop.hbase.mapreduce):a utility for TableMapper or TableReducer。因为本例子中的reduce继承的是TableReducer,所以也就解释了用TableMapReduceUtil来执行的原因。该类的方法有:addDependencyJars(),initTableMapperJob(),initTableReducerJob(),limitNumReduceTasks(),setNumReduceTasks()等,详细包括参数等可以查看API。

           同时注意本程序代码的格式,将Map,Reduce,以及Job的配置分离,比较清晰。之前写代码喜欢把map,reduce 以及job配置全都写在一个类中,可能这是一种不太好的习惯。这里注意Driver类,要继承 Configured 类和实现 Tool 接口,以及实现Tool中的run方法,在run方法中对job进行配置。 同时main函数中用ToolRunner.run() 方法来调用Driver类。

           本人的一点理解,如有错误,欢迎指正,也欢迎大家一起交流mapreduce编程的知识,我的email:dongtingting8877@163.com  。



  • 相关阅读:
    1144 The Missing Number (20分)
    1145 Hashing
    1146 Topological Order (25分)
    1147 Heaps (30分)
    1148 Werewolf
    1149 Dangerous Goods Packaging (25分)
    TypeReference
    Supervisor安装与配置()二
    谷粒商城ES调用(十九)
    Found interface org.elasticsearch.common.bytes.BytesReference, but class was expected
  • 原文地址:https://www.cnblogs.com/liqizhou/p/2652611.html
Copyright © 2011-2022 走看看