zoukankan      html  css  js  c++  java
  • 入门程序 ==> WordCount

    package com.rabbit.hadoop.mapreduce;

    import java.io.IOException;
    import java.util.StringTokenizer;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.omg.CORBA.ARG_IN;

    /**
    * MapReduce
    * Word counter
    */

    public class WordCountMapReduce extends Configured implements Tool{

    // 1. Map Class
    public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text mapOutPutKey = new Text();
    private final static IntWritable mapOutPutValue = new IntWritable(1);

    @Override
    public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    throws IOException, InterruptedException {

    // read one line <0,hadoop yarn> 偏移量,值
    String lineValue = value.toString(); // 获取值

    // words split
    StringTokenizer stringTokenizer = new StringTokenizer(lineValue);

    //iterator
    while (stringTokenizer.hasMoreTokens()) {
    String word = stringTokenizer.nextToken();

    mapOutPutKey.set(word);

    context.write(mapOutPutKey, mapOutPutValue);

    }
    }


    }

    // 2. Reduce Class
    public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable outputValue = new IntWritable();

    @Override
    protected void reduce(Text keys, Iterable<IntWritable> values,
    Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

    // <hadoop,1> <yarn,1> <hadoop,1> <hive,1> <hadoop,1>...... -> 分组 <hadoop,list(1,1,1)>
    // list中的元素相加,求和,就是"hadoop"这个单词的出现次数。
    System.out.println("input key: "+keys.toString());
    System.out.println("::::::::::::::::::::::::::::::::::::::");

    int sum = 0;
    for (IntWritable value : values) {
    System.out.println("recursively print key: "+keys.toString()+" value is: "+value.get());
    sum += value.get();
    }

    outputValue.set(sum);

    context.write(keys, outputValue);
    }


    }

    // 3. Driver
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

    // 1. get configuration

    Configuration configuration = getConf();

    // 2. create job : parameters: configuration, jobName
    Job job = Job.getInstance(configuration,this.getClass().getSimpleName());

    // 3.run jars
    job.setJarByClass(this.getClass());

    // 4. set job:
    // input -> map -> reduce -> output
    // input
    Path inPath = new Path(args[0]);

    FileInputFormat.addInputPath(job, inPath);

    // set mapper class,output key, output value
    job.setMapperClass(WordCountMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    // set reducer class,output key, output value
    job.setReducerClass(WordCountReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    // output
    Path outPath = new Path(args[1]);
    FileOutputFormat.setOutputPath(job, outPath);

    // 5. submit job
    boolean isSuccess = job.waitForCompletion(true);

    return isSuccess? 0 : 1;

    }

    // run program
    public static void main(String[] args) throws Exception {

    Configuration configuration = new Configuration();
    // int status = new WordCountMapReduce().run(args);
    // configuration.set("mapreduce.framework.name", "yarn");  表示提交到yarn上面执行
    args = new String[] {"D:\input\1.txt","D:\output2"};

    int status = ToolRunner.run(configuration, new WordCountMapReduce(), args);

    System.exit(status);
    }

    }

    ======================================================================================================

    MapReduce的写法比较固定,主要就是重写map()和 reduce()两个方法,在方法里面实现自己的逻辑。

    整个mr程序执行的过程可以归结为5步: input -> map -> shuffle -> reduce -> output

    shuflle阶段可以说是带reduce的mr程序的核心了。之所以说是带reduce的mr的核心,是因为mr可以没有reduce方法,但一定有map方法。

    shuffle的过程中包含了排序、溢写、分区、合并、分组这些操作。

    mr在读数据的时候是按键值对的形式读取的,key即为每一行开头在整个文件中的偏移量,value即为这一行的文本内容。

    比如 :

    Hadoop  Hadoop Hive

    Spark Hadoop Hive

    读到mr中的格式为 <0,"Hadoop  Hadoop Hive"> , <19,"Spark Hadoop Hive">  。回车符也是一个字符,所以第二行的起始位置是19

    我们要统计每个单词出现的次数,首先需要将单个的单词按照空格切割出来。将单个单词以键值对的形式写出,得到这种形式:<Hadoop,1> <Hadoop,1> <Hadoop,1> <Hive,1> <Hive,1><Spark,1> 。这个逻辑在map()中实现,可以理解为对于读到的每一行都做相同的处理。

    每个map会溢写出一个或者多个小文件,在溢写之前会在内存中将key按字典序排序。如果指定了多个reduce,那么还会做分区。分区编号可以简单理解为 key的hashcode 对reduce的个数取模。

    假如有两个分区,那么每个key的分区编号就为 key.hashcode() % 2 ,每个数字对2取模只有0和1两个值,就代表了两个分区。

    接下来,每个map溢写出的小文件会合并成大文件。合并的过程中还是会按照key的字典序排序,并且保留分区。

    经过这些过程,最终,每个map会有一个合并后的大文件。

    接下来,reduce会主动去到这些大文件中拉取属于自己分区的数据,并且将拿到的分散的数据再做合并,形成一个文件。

    接着,分组操作来了。我们目前的数据形式还是 <Hadoop,1> <Hadoop,1> <Hadoop,1> <Hive,1> <Hive,1><Spark,1> 。 经过分组操作之后,就会变成这种形式:<Hadoop,[1,1,1]> <Hive,[1,1]> <Spark,[1]>

    所以reduce方法的第二个参数是 Iterable<>,它是一个迭代器。实现累加就可以计算出key出现的次数了。

  • 相关阅读:
    TypeError: translate() takes exactly one argument (2 given)
    matlab为long term visual tracking数据集生成groundtruth.txt
    Linux下为python3安装opencv
    tensorflow全连接层降维
    MDNet结果json文件转成long term visual tracking (oxuva)评估所需的csv文件的python脚本
    no module named caffe
    IIS短文件/文件夹泄露漏洞
    点击劫持漏洞
    WPF Combobox数据绑定 Binding
    关于Win10安装vs2013简体中文语言包无法安装的问题
  • 原文地址:https://www.cnblogs.com/rabbit624/p/10551557.html
Copyright © 2011-2022 走看看