zoukankan      html  css  js  c++  java
  • Hadoop MapReduce编程 API入门系列之wordcount版本5(九)

      这篇博客,给大家,体会不一样的版本编程。

    代码

    package zhouls.bigdata.myMapReduce.wordcount1;

    import java.io.IOException;

    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    //4个泛型中,前两个是指定mapper输入数据的类型,KEYIN是输入的key的类型,VALUEIN是输入的value的类型
    //map 和 reduce 的数据输入输出都是以 key-value对的形式封装的
    //默认情况下,框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,这一行的内容作为value
    public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    //mapreduce框架每读一行数据就调用一次该方法
    @Override
    protected void map(LongWritable key, Text value,Context context)
    throws IOException, InterruptedException {
    //具体业务逻辑就写在这个方法体中,而且我们业务要处理的数据已经被框架传递进来,在方法的参数中 key-value
    //key 是这一行数据的起始偏移量 value 是这一行的文本内容

    //将这一行的内容转换成string类型
    String line = value.toString();

    //对这一行的文本按特定分隔符切分
    String[] words = StringUtils.split(line, " ");

    //遍历这个单词数组输出为kv形式 k:单词 v : 1
    for(String word : words){

    context.write(new Text(word), new LongWritable(1));

    }

    }



    }

    package zhouls.bigdata.myMapReduce.wordcount1;

    import java.io.IOException;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{



    //框架在map处理完成之后,将所有kv对缓存起来,进行分组,然后传递一个组<key,valus{}>,调用一次reduce方法
    //<hello,{1,1,1,1,1,1.....}>
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,Context context)
    throws IOException, InterruptedException {

    long count = 0;
    //遍历value的list,进行累加求和
    for(LongWritable value:values){

    count += value.get();
    }

    //输出这一个单词的统计结果

    context.write(key, new LongWritable(count));

    }

    }

    package zhouls.bigdata.myMapReduce.wordcount1;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;

    import zhouls.bigdata.myMapReduce.Anagram.Anagram;

    /**
    * 用来描述一个特定的作业
    * 比如,该作业使用哪个类作为逻辑处理中的map,哪个作为reduce
    * 还可以指定该作业要处理的数据所在的路径
    * 还可以指定改作业输出的结果放到哪个路径
    * ....
    *
    *
    */
    public class WCRunner implements Tool {
    public int run(String[] arg0) throws Exception {
    Configuration conf = new Configuration();
    //2删除已经存在的输出目录
    Path mypath = new Path(arg0[1]);//下标为1,即是输出路径
    FileSystem hdfs = mypath.getFileSystem(conf);//获取文件系统
    if (hdfs.isDirectory(mypath))
    {//如果文件系统中存在这个输出路径,则删除掉
    hdfs.delete(mypath, true);
    }

    Job wcjob = new Job(conf, "WC");//构建一个job对象,取名为testAnagram

    //设置整个job所用的那些类在哪个jar包
    wcjob.setJarByClass(WCRunner.class);


    //本job使用的mapper和reducer的类
    wcjob.setMapperClass(WCMapper.class);
    wcjob.setReducerClass(WCReducer.class);


    //指定reduce的输出数据kv类型
    wcjob.setOutputKeyClass(Text.class);
    wcjob.setOutputValueClass(LongWritable.class);

    //指定mapper的输出数据kv类型
    wcjob.setMapOutputKeyClass(Text.class);
    wcjob.setMapOutputValueClass(LongWritable.class);





    FileInputFormat.addInputPath(wcjob, new Path(arg0[0]));// 文件输入路径
    FileOutputFormat.setOutputPath(wcjob, new Path(arg0[1]));// 文件输出路径
    //将job提交给集群运行
    wcjob.waitForCompletion(true);

    return 0;

    }


    public static void main(String[] args) throws Exception
    {//定义数组来保存输入路径和输出路径
    //集群路径
    // String[] args0 = { "hdfs://HadoopMaster:9000/wc.txt",
    // "hdfs://HadoopMaster:9000/out/wc/"};

    //本地路径
    String[] args0 = { "./data/wc.txt",
    "out/wc/"};

    int ec = ToolRunner.run( new Configuration(), new WCRunner(), args0);
    System. exit(ec);
    }


    @Override
    public Configuration getConf() {
    // TODO Auto-generated method stub
    return null;
    }


    @Override
    public void setConf(Configuration arg0) {
    // TODO Auto-generated method stub

    }


    }

  • 相关阅读:
    js中当call或者apply传入的第一个参数是null/undefined时,js函数内执行的上下文环境是什么?
    闭包的实现原理和作用以及堆栈溢出和内存泄漏原理和相应解决办法
    JavaScript的作用域和作用域链
    词法作用域和动态作用域
    理解 es6 中class构造以及继承的底层实现原理
    new一个对象的详细过程,手动实现一个 new操作符
    实现继承的几种方式以及他们的优缺点
    理解JavaScript的执行上下文栈,可以应用堆栈信息快速定位问题
    原型和原型链-instanceof的底层实现原理
    js判断变量未定义
  • 原文地址:https://www.cnblogs.com/zlslch/p/6163830.html
Copyright © 2011-2022 走看看