zoukankan      html  css  js  c++  java
  • MapReduc操作HBase

    本文来自:http://qa.taobao.com/?p=13914

    实例分析

    本文通过实例分析演示了使用MapReduce分析HBase的数据,需要注意的这只是一种常规的方式(分析表中的数据存到另外的表中),实际上不局限于此,不过其他方式跟此类似。blog例子来进行示例分析,业务需求是这样:找到具有相同兴趣的人,我们简单定义为如果author之间article的tag相同,则认为两者有相同兴趣,将分析结果保存到HBase。除了上面介绍的blog表外,我们新增一张表tag_friend,RowKey为tag,Value为authors,大概就下面这样。

    blog示例:

    我们省略了一些跟分析无关的Column数据,上面的数据按前面描述的业务需求经过MapReduce分析,应该得到下面的结果

    tag_friend示例:

    实际的运算过程分析如下:

    代码如下:

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;

    public class TestHBaseMP extends Configured implements Tool {

        public static class MapClass extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
           //默认的输入key value类型是<ImmutableBytesWritable,Result>,上面是输出key value类型

       public void map(ImmutableBytesWritable row, Result values,
                    Context context) throws IOException,InterruptedException {
               
                ImmutableBytesWritable value = null;
                String[] tags = null;
               
                for(KeyValue kv : values.raw()){
                   
                    if("author".equals(Bytes.toString(kv.getFamily()))
                            &&"nickname".equals(Bytes.toString(kv.getQualifier()))){
                        value = new ImmutableBytesWritable(kv.getValue());
                    }
                    if("article".equals(Bytes.toString(kv.getFamily()))
                            &&"tags".equals(Bytes.toString(kv.getQualifier()))){
                        tags = Bytes.toString(kv.getValue()).split(",");
                    }
                   
                }
               
                for(int i=0;i<tags.length;i++){
                    ImmutableBytesWritable key = new ImmutableBytesWritable(Bytes.toBytes(tags[i]));
                    context.write(key, value);
                   
                }
            }
        }

        public static class Reduce extends TableReducer<ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable>{
           //KEYIN, VALUEIN, KEYOUT

       public void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable>values,
                    Context context) throws IOException,InterruptedException {
                String friends = "";
                for(ImmutableBytesWritable val : values){
                    friends += (friends.length()>0?",":"") + Bytes.toString(val.get());
                }
                Put put = new Put(key.get());
                put.add(Bytes.toBytes("person"),Bytes.toBytes("nicknames"),Bytes.toBytes(friends));
               
                context.write(key, put);
            }
        }


        //在提交作业时设置inputFormat为TableInputFormat,设置outputFormat为TableOutputFormat,可以借助TableMapReduceUtil类来简化编码。
        @Override
        public int run(String[] arg0) throws Exception {
            Configuration conf = new Configuration();
            conf = HBaseConfiguration.create(conf);
           
            Job job = new Job(conf,"test HBase mapreduce");
            job.setJarByClass(TestHBaseMP.class);
           
            Scan scan = new Scan();
            scan.addColumn(Bytes.toBytes("author"),Bytes.toBytes("nickname"));
            scan.addColumn(Bytes.toBytes("article"),Bytes.toBytes("tags"));
           
            TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("blog"),
                                                                       scan,  //scan将被传递给TableInputFormat,并限制MapClass所能看到的信息
                                                                       MapClass.class, 
                                                                       ImmutableBytesWritable.class, 
                                                                       ImmutableBytesWritable.class,
                                                                       job);
            TableMapReduceUtil.initTableReducerJob("tag_friend",
                                                                        Reduce.class,
                                                                        job);
            System.exit(job.waitForCompletion(true)?0:1);
            return 0;
        }

        public static void main(String[] args) throws Exception{
            int res = ToolRunner.run(new Configuration(), new TestHBaseMP(), args);
            System.exit(res);
        }
    }

  • 相关阅读:
    python中的__init__
    python中的单例模式
    python中闭包和装饰器
    sql多表查询
    configurationChanges
    excludeFromRecents标签
    activity-alias
    meta-data
    launchMode
    Apache ant 配置
  • 原文地址:https://www.cnblogs.com/liangzh/p/2457703.html
Copyright © 2011-2022 走看看