zoukankan      html  css  js  c++  java
  • hbase学习记录(4)hbase和Hadoop整合(实现wrodcount程序)

    1.实现方法

      Hbase对MapReduce提供支持,它实现了TableMapper类和TableReducer类,我们只需要继承这两个类即可。

    2.准备

      首先在hdfs上面上传一段测试文本,本程序测试文本保存在/user/root/test目录下

      在hbase中建立一个表,拥有一个列族就可以

    3.实现代码

      mapper的代码

    package com.bjsxt.wc;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] strs = value.toString().split(" ");
            for (String string : strs) {
                context.write(new Text(string), new IntWritable(1));
            }
        }
    }

      TableReducer类的实现

    package com.bjsxt.wc;
    
    import java.io.IOException;
    
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    
    public class WCReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
    
        @Override
        protected void reduce(Text text, Iterable<IntWritable> iterable, Context context)
                throws IOException, InterruptedException {
    
            int sum = 0;
            for (IntWritable it : iterable) {
                sum += it.get();
            }
            Put put = new Put(text.toString().getBytes());
            put.add("cf".getBytes(), "ct".getBytes(), (sum + "").getBytes());
            context.write(null, put);
    
        }
    }

      主程实现(注意reduce配置时,使用TableMapReduceUtil.initTableReducerJob("wc", WCReducer.class, job, null, null, null, null, false);

    package com.bjsxt.wc;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    public class WCRunner {
    
        public static void main(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://node01:8020");
            conf.set("hbase.zookeeper.quorum", "node02,node03,node04");
            Job job = Job.getInstance(conf);
            job.setJarByClass(WCRunner.class);
    
            // 指定mapper 和 reducer
            job.setMapperClass(WCMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            // 最后一个参数设置false
            // TableMapReduceUtil.initTableReducerJob(table, reducer, job);
            TableMapReduceUtil.initTableReducerJob("wc", WCReducer.class, job, null, null, null, null, false);
            FileInputFormat.addInputPath(job, new Path("/user/root/test.txt"));
            job.waitForCompletion(true);
        }
    }
  • 相关阅读:
    学期总结
    C语言I博客作业09
    C语言I博客作业08
    C语言I博客作业07
    C语言I博客作业06
    C语言博客作业04
    C语言I博客作业03
    C语言I博客作业02
    C语言I博客作业01
    学期总结
  • 原文地址:https://www.cnblogs.com/kpsmile/p/10338806.html
Copyright © 2011-2022 走看看