zoukankan      html  css  js  c++  java
  • Hadoop之WordCount

    求平均数是MapReduce比较常见的算法,求平均数的算法也比较简单,一种思路是Map端读取数据,在数据输入到Reduce之前先经过shuffle,将map函数输出的key值相同的所有的value值形成一个集合value-list,然后将输入到Reduce端,Reduce端汇总并且统计记录数,然后作商即可。具体原理如下图所示:

    系统环境

    Linux Centos 7

    jdk 1.8

    hadoop-3.2

    IDEA代码编译器

    实验任务:

    求平均数是MapReduce比较常见的算法,求平均数的算法也比较简单,一种思路是Map端读取数据,在数据输入到Reduce之前先经过shuffle,将map函数输出的key值相同的所有的value值形成一个集合value-list,然后将输入到Reduce端,Reduce端汇总并且统计记录数,然后作商即可。具体原理如下图所示:

    商品分类 商品点击次数
    52127    5
    52120    93
    52092    93
    52132    38
    52006    462
    52109    28
    52109    43
    52132    0
    52132    34
    52132    9
    52132    30
    52132    45
    52132    24
    52009    2615
    52132    25
    52090    13
    52132    6
    52136    0
    52090    10
    52024    347

    要求使用mapreduce统计出每类商品的平均点击次数。

    结果数据如下:

    商品分类 商品平均点击次数
    52006    462
    52009    2615
    52024    347
    52090    11
    52092    93
    52109    35
    52120    93
    52127    5
    52132    23
    52136    0

    步骤:

    1.切换到指定目录,启动集群

      开启hadoop集群,本地安装的为高可用主从二节点的hadoop集群,集成了各项大数据组件。

      先开启zookeeper,再开启hdfs,再开启yarn。

      倘若本地安装的是普通分布式或伪分布式集群,直接./start-all.sh启动集群即可。

    2.在linux将数据集上传到hdfs中

    hadoop fs -mkdir -p /mymapreduce4/in  
    hadoop fs -put /data/mapreduce4/goods_click /mymapreduce4/in 
    10181    1000481    2010-04-04 16:54:31
    20001    1001597    2010-04-07 15:07:52
    20001    1001560    2010-04-07 15:08:27
    20042    1001368    2010-04-08 08:20:30
    20067    1002061    2010-04-08 16:45:33
    20056    1003289    2010-04-12 10:50:55
    20056    1003290    2010-04-12 11:57:35
    20056    1003292    2010-04-12 12:05:29
    20054    1002420    2010-04-14 15:24:12
    20055    1001679    2010-04-14 19:46:04
    20054    1010675    2010-04-14 15:23:53
    20054    1002429    2010-04-14 17:52:45
    20076    1002427    2010-04-14 19:35:39
    20054    1003326    2010-04-20 12:54:44
    20056    1002420    2010-04-15 11:24:49
    20064    1002422    2010-04-15 11:35:54
    20056    1003066    2010-04-15 11:43:01
    20056    1003055    2010-04-15 11:43:06
    20056    1010183    2010-04-15 11:45:24
    20056    1002422    2010-04-15 11:45:49
    20056    1003100    2010-04-15 11:45:54
    20056    1003094    2010-04-15 11:45:57
    20056    1003064    2010-04-15 11:46:04
    20056    1010178    2010-04-15 16:15:20
    20076    1003101    2010-04-15 16:37:27
    20076    1003103    2010-04-15 16:37:05
    20076    1003100    2010-04-15 16:37:18
    20076    1003066    2010-04-15 16:37:31
    20054    1003103    2010-04-15 16:40:14
    20054    1003100    2010-04-15 16:40:16

    3.创建java工程,将jar包导入进去

      为了避免版本冲突,和不必要的麻烦,可将hadoop目录下share/hadoop文件中的所有jar包导入进去。

    Mapper代码<<<<
    public
    static class Map extends Mapper<Object , Text , Text , IntWritable>{ private static Text newKey=new Text(); //实现map函数 public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ // 将输入的纯文本文件的数据转化成String String line=value.toString(); System.out.println(line); String arr[]=line.split(" "); newKey.set(arr[0]); int click=Integer.parseInt(arr[1]); context.write(newKey, new IntWritable(click)); } }
    Reduce代码<<<<<
    public
    static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{ //实现reduce函数 public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ int num=0; int count=0; for(IntWritable val:values){ num+=val.get(); //每个元素求和num count++; //统计元素的次数count } int avg=num/count; //计算平均数 context.write(key,new IntWritable(avg)); } }

    完整代码如下:

    package mapreduce;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    public class MyAverage{
        public static class Map extends Mapper<Object , Text , Text , IntWritable>{
        private static Text newKey=new Text();
        public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
        String line=value.toString();
        System.out.println(line);
        String arr[]=line.split("	");
        newKey.set(arr[0]);
        int click=Integer.parseInt(arr[1]);
        context.write(newKey, new IntWritable(click));
        }
        }
        public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{
        public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
            int num=0;
            int count=0;
            for(IntWritable val:values){
            num+=val.get();
            count++;
            }
            int avg=num/count;
            context.write(key,new IntWritable(avg));
            }
            }
            public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
            Configuration conf=new Configuration();
            System.out.println("start");
            Job job =new Job(conf,"MyAverage");
            job.setJarByClass(MyAverage.class);
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            Path in=new Path("hdfs://localhost:9000/mymapreduce4/in/goods_click");
            Path out=new Path("hdfs://localhost:9000/mymapreduce4/out");
            FileInputFormat.addInputPath(job,in);
            FileOutputFormat.setOutputPath(job,out);
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    
            }
            }

    4.执行

      执行方式有两种

      ①直接在本地运行,前提要在本地配置好hadoop环境变量,直接运行即可。

      ②将此文件打包成jar包,上传到linux中再,用命令运行。

    hadoop jar /apps/hadoop/hadoop-mapreduce.jar wordcount /in /out  

    查看运行结果:

    hadoop fs -ls /mymapreduce4/out
    hadoop fs -cat /mymapreduce4/out/part-r-00000

     

  • 相关阅读:
    【学习笔记/题解】树上启发式合并/CF600E Lomsat gelral
    【学习笔记/题解】虚树/[SDOI2011]消耗战
    【题解】 [GZOI2017]小z玩游戏
    【题解】CF1426E Rock, Paper, Scissors
    【题解】CF1426D Non-zero Segments
    【题解】NOIP2018 填数游戏
    【题解】NOIP2018 旅行
    【题解】NOIP2018 赛道修建
    【题解】时间复杂度
    【题解】「MCOI-02」Convex Hull 凸包
  • 原文地址:https://www.cnblogs.com/jake-jin/p/11865387.html
Copyright © 2011-2022 走看看