zoukankan      html  css  js  c++  java
  • Hadoop- Wordcount程序原理及代码实现

    如果对Hadoop- MapReduce分布式计算框架原理还不熟悉的可以先了解一下它,因为本文的wordcount程序实现就是MapReduce分而治之最经典的一个范例。

    单词计数(wordcount)主要步骤:

    1.读数据
    2.按行处理
    3.按空格切分行内单词
    4.HashMap(单词,value+1)
    等分给自己的数据片全部读取完之后
    5.将HashMap按照首字母范围分为3个HashMap
    6.将3个hashMap分别传给3个ReduceTask
     
    主要流程如下图:

    代码实现:

     理解了原理,那么就从一个Job开始,从分Map任务和Reduce任务开始。用户编写的程序分为三个部分:Mapper,Reducer,Driver。

    Mapper的输入数据和输出数据是KV对的形式(KV的类型可自定义),Mapper的业务逻辑是写在map()方法中,map()方法(maptask进程)对每一个<k,v>调用一次

    Reducer的输入数据类型对应Mapper的输出数据类型,也是KV。Reducer的业务逻辑写在reduce()方法中,Reduce()方法对每一组相同的<k,v>组调用一次。

    用户的Mapper和Reduce都要继承各自的父类。

    整个程序需要一个Driver来进行提交,提交的是一个描述了各种必要信息的job对象。 

    1.设定Map任务:

    package cn.Rz_Lee.hadoop.com.mr.wordcount;
    
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * Created by Rz_Lee on 2017/8/14.
     * KEYIN:默认情况下是mr框架所读到的一行文本的偏移量,Long
     * 但是在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而用LongWritable
     *
     * VALUE:默认情况下是mr框架所读到的一行文本内容,String,同上用Text
     *
     *KEYOUT:是用户自定义逻辑处理写成之后输出数据中的key,在此是单词,String,同上,用Text
     *VALUEOUT:是用户自定义逻辑处理写成之后输出数据中的value,在此处是单词总次数,Integer,同上,用IntWritale
     *
     */
    public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
        /**
         * map阶段的业务逻辑就写在自定义的map()方法中
         * maptask会对每一行输入数据调用一次我们自定义的map()方法
         * @param key
         * @param value
         * @param context 输出内容
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //将maptask传给我们的文本内容先转换成String
            String line = value.toString();
            //根据空格将一行切分成单词
            String[] words = line.split(" ");
    
            //将单词输出为<单词,1>
            for(String word:words)
            {
                //将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发经便于相同单词会到相同的reduce task
                context.write(new Text(word),new IntWritable(1));
            }
        }
    }
    

    2.设定Reduce任务:

    package cn.Rz_Lee.hadoop.mr.wordcount;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**KEYIN,VALUEIN 对应mapper输出的KEYOUT,VALUEOUT类型对应
     *
     * KYEOUT,VALUEOUT 是自定义reduce逻辑处理结果的输出数据类型
     * KYEOUT是单词
     * VALUE是总次数
     * Created by Rz_Lee on 2017/8/14.
     */
    public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        /**
         *
         * @param key 是一组相同单词KV对的key,如<hi,1>,<hi,1>
         * @param values
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int count=0;
            for (IntWritable value:values)
            {
                count+=value.get();
            }
            context.write(key,new IntWritable(count));
        }
    }
    

    3.wordcount程序的操作类,提交运行mr程序的yarn客户端:

    package cn.Rz_Lee.hadoop.com..mr.wordcount;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    /**相当于一个yarn集群的客户端
     * 需要在此封装我们的mr程序相关运行参数,指定jar包
     * 最后提交给yarn
     * Created by Rz_Lee on 2017/8/14.
     */
    public class WordCountDriver {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            /*conf.set("mapreduce.framework.name","yarn");
            conf.set("yarn.resourcemanager.hostname","srv01");*/
    
            /*job.setJar("/usr/hadoop/wc.jar");*/
            //指定本程序的jar包所在的本地路径
            job.setJarByClass(WordCountDriver.class);
    
    
            //指定本业务job使用的mapper/reducer业务类
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
    
            //指定mapper输出数据的KV类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            //指定最终输出的数据的KV类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            //指定job的输入原始文件所在目录
            FileInputFormat.setInputPaths(job,new Path(args[0]));
            //指定job的输出结果所在目录
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
            /*job.submit();*/
            boolean res = job.waitForCompletion(true);
            System.exit(res?0:1);
        }
    }
    

    4.把wordcount项目导成jar包,上传到HDFS,运行 hadoop jar wordcount.jar 包.类名 /源文件路径 /输出数据文件夹 

     在yarn上面运行: yarn jar wordcount.jar 包.类名 /源文件路径 /输出数据文件夹 

    打开浏览器输入:yarn节点的IP:8088 ,在网页上可以看见整个Job的运行情况。  

     

  • 相关阅读:
    解决PKIX:unable to find valid certification path to requested target 的问题
    Linux 上的常用文件传输方式介绍与比较
    用VNC远程图形化连接Linux桌面的配置方法
    红帽中出现”This system is not registered with RHN”的解决方案
    linux安装时出现your cpu does not support long mode的解决方法
    CentOS SSH配置
    es6扩展运算符及rest运算符总结
    es6解构赋值总结
    tortoisegit安装、clon、推送
    es6环境搭建
  • 原文地址:https://www.cnblogs.com/RzCong/p/7362365.html
Copyright © 2011-2022 走看看