zoukankan      html  css  js  c++  java
  • Hadoop- Wordcount程序原理及代码实现

    如果对Hadoop- MapReduce分布式计算框架原理还不熟悉的可以先了解一下它,因为本文的wordcount程序实现就是MapReduce分而治之最经典的一个范例。

    单词计数(wordcount)主要步骤:

    1.读数据
    2.按行处理
    3.按空格切分行内单词
    4.HashMap(单词,value+1)
    等分给自己的数据片全部读取完之后
    5.将HashMap按照首字母范围分为3个HashMap
    6.将3个hashMap分别传给3个ReduceTask
     
    主要流程如下图:

    代码实现:

     理解了原理,那么就从一个Job开始,从分Map任务和Reduce任务开始。用户编写的程序分为三个部分:Mapper,Reducer,Driver。

    Mapper的输入数据和输出数据是KV对的形式(KV的类型可自定义),Mapper的业务逻辑是写在map()方法中,map()方法(maptask进程)对每一个<k,v>调用一次

    Reducer的输入数据类型对应Mapper的输出数据类型,也是KV。Reducer的业务逻辑写在reduce()方法中,Reduce()方法对每一组相同的<k,v>组调用一次。

    用户的Mapper和Reduce都要继承各自的父类。

    整个程序需要一个Driver来进行提交,提交的是一个描述了各种必要信息的job对象。 

    1.设定Map任务:

    package cn.Rz_Lee.hadoop.com.mr.wordcount;
    
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * Created by Rz_Lee on 2017/8/14.
     * KEYIN:默认情况下是mr框架所读到的一行文本的偏移量,Long
     * 但是在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而用LongWritable
     *
     * VALUE:默认情况下是mr框架所读到的一行文本内容,String,同上用Text
     *
     *KEYOUT:是用户自定义逻辑处理写成之后输出数据中的key,在此是单词,String,同上,用Text
     *VALUEOUT:是用户自定义逻辑处理写成之后输出数据中的value,在此处是单词总次数,Integer,同上,用IntWritale
     *
     */
    public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
        /**
         * map阶段的业务逻辑就写在自定义的map()方法中
         * maptask会对每一行输入数据调用一次我们自定义的map()方法
         * @param key
         * @param value
         * @param context 输出内容
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //将maptask传给我们的文本内容先转换成String
            String line = value.toString();
            //根据空格将一行切分成单词
            String[] words = line.split(" ");
    
            //将单词输出为<单词,1>
            for(String word:words)
            {
                //将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发经便于相同单词会到相同的reduce task
                context.write(new Text(word),new IntWritable(1));
            }
        }
    }
    

    2.设定Reduce任务:

    package cn.Rz_Lee.hadoop.mr.wordcount;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**KEYIN,VALUEIN 对应mapper输出的KEYOUT,VALUEOUT类型对应
     *
     * KYEOUT,VALUEOUT 是自定义reduce逻辑处理结果的输出数据类型
     * KYEOUT是单词
     * VALUE是总次数
     * Created by Rz_Lee on 2017/8/14.
     */
    public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        /**
         *
         * @param key 是一组相同单词KV对的key,如<hi,1>,<hi,1>
         * @param values
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int count=0;
            for (IntWritable value:values)
            {
                count+=value.get();
            }
            context.write(key,new IntWritable(count));
        }
    }
    

    3.wordcount程序的操作类,提交运行mr程序的yarn客户端:

    package cn.Rz_Lee.hadoop.com..mr.wordcount;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    /**相当于一个yarn集群的客户端
     * 需要在此封装我们的mr程序相关运行参数,指定jar包
     * 最后提交给yarn
     * Created by Rz_Lee on 2017/8/14.
     */
    public class WordCountDriver {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            /*conf.set("mapreduce.framework.name","yarn");
            conf.set("yarn.resourcemanager.hostname","srv01");*/
    
            /*job.setJar("/usr/hadoop/wc.jar");*/
            //指定本程序的jar包所在的本地路径
            job.setJarByClass(WordCountDriver.class);
    
    
            //指定本业务job使用的mapper/reducer业务类
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
    
            //指定mapper输出数据的KV类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            //指定最终输出的数据的KV类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            //指定job的输入原始文件所在目录
            FileInputFormat.setInputPaths(job,new Path(args[0]));
            //指定job的输出结果所在目录
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
            /*job.submit();*/
            boolean res = job.waitForCompletion(true);
            System.exit(res?0:1);
        }
    }
    

    4.把wordcount项目导成jar包,上传到HDFS,运行 hadoop jar wordcount.jar 包.类名 /源文件路径 /输出数据文件夹 

     在yarn上面运行: yarn jar wordcount.jar 包.类名 /源文件路径 /输出数据文件夹 

    打开浏览器输入:yarn节点的IP:8088 ,在网页上可以看见整个Job的运行情况。  

     

  • 相关阅读:
    POJ1239
    HDU 2829 四边形不等式优化
    返回数字二进制的最高位位数o(n)
    矩阵快速幂 模板
    HDU4718 The LCIS on the Tree(LCT)
    HDU4010 Query on The Trees(LCT)
    HDU3487 Play With Chains(Splay)
    CF444C DZY Loves Colors
    HDU4836 The Query on the Tree(树状数组&&LCA)
    HDU4831&&4832&&4834
  • 原文地址:https://www.cnblogs.com/RzCong/p/7362365.html
Copyright © 2011-2022 走看看