zoukankan      html  css  js  c++  java
  • MapReduce小试牛刀-WordCount实战

    MapReduce是一种分布式计算模式,由Google提出,主要用于搜索领域,解决海量数据的计算问题,mapRedcure是分布式运行的,分为map阶段和Reduce阶段。
    Map阶段是一个独立的程序,可在多个节点同时运行,每个节点处理一部分数据。
    Reduce阶段也是一个独立的程序,可以再一个或者多个节点同时运行,每个节点处理一部分数据。
    观摩笔记

    WordCount实战

    package com.imooc.mr;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    /**
     * 需求:读取hdfs上的hello word.txt文件,计算文件中每个单词出现的总次数
     *
     * 原始文件hello.txt内容如下:
     * hello you
     * hello me
     *
     * 最终需要的结果形式如下:
     * hello 2
     * me 1
     * you 1
     *
     *
     * @author zhangshao
     * @date 2021/3/28 4:59 下午
     */
    public class WordCountJob {
        public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
            /**
             * 需要实现map函数,
             * 这个map函数就是可以接收<k1,v1>,产生<K2,v2>
             * @param k1 代表每一行数据的行首偏移量,
             * @param v1 v1代表的是每一行内容
             * @param context
             * @throws IOException
             * @throws InterruptedException
             */
            @Override
            protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
                String[] words = v1.toString().split(" ");
                for(String word:words){
                    //把迭代出来的单词封装<k2,v2>的形式
                    Text k2 = new Text(word);
                    LongWritable v2 = new LongWritable(1L);
                    //把<k2,v2>写出去
                    context.write(k2,v2);
                }
            }
        }
    
        /**
         * reduce 阶段
         */
        public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
    
    
            @Override
            public void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
                //创建一个sum变量,保存v2s之和
                long sum = 0L;
                for(LongWritable v2:v2s){
                    sum +=v2.get();
                }
                //组装k3,v3
                Text k3 = k2;
                LongWritable v3 = new LongWritable(sum);
                //把结果写出去
                context.write(k3,v3);
            }
        }
    
        /**
         * 组装jon = map+reduce
         */
        public static void main(String[] args) {
            try{
    
                if(args.length!=2){
                    //如果传递的参数不够,程序直接退出
                    System.exit(100);
    
                }
    
                //指定job需要的配置参数
                Configuration conf = new Configuration();
                //创建一个job
                Job job = Job.getInstance(conf);
                //注意:这一行必须设置,否则在集群中执行的时候找不到WordCountJob这个类
                job.setJarByClass(WordCountJob.class);
    
                //指定输入路径(可以是文件,也可以是路径)
                FileInputFormat.setInputPaths(job,new Path(args[0]));
                //输出路径(只能指定一个不存在的路径)
                FileOutputFormat.setOutputPath(job,new Path(args[1]));
    
                //指定map相关的代码
                job.setMapperClass(MyMapper.class);
                //指定k2的类型
                job.setMapOutputKeyClass(Text.class);
                //指定v2的类型
                job.setMapOutputValueClass(LongWritable.class);
    
                //指定reduce相关的类型
                job.setReducerClass(MyReducer.class);
                job.setOutputKeyClass(Text.class);
                job.setMapOutputValueClass(LongWritable.class);
    
                //提交job
                job.waitForCompletion(true);
    
            }catch (Exception e){
                e.printStackTrace();
            }
    
        }
    
    
    }
    
    

    主代码开发完毕后,需要打jar包到集群上去执行,需要在pom中添加maven的编译打包插件。

    <build>
            <plugins>
                <!-- compiler插件, 设定JDK版本 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>2.3.2</version>
                    <configuration>
                        <encoding>UTF-8</encoding>
                        <source>1.8</source>
                        <target>1.8</target>
                        <showWarnings>true</showWarnings>
                    </configuration>
                </plugin>
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                        <archive>
                            <manifest>
                                <mainClass></mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    

    注意:在添加完build相关编译依赖后,还需要在pom文件中的hadoop-client和log4j依赖中增加scope属性,值为provided,表示只在编译的时候使用该依赖,在执行及打包的时候不使用,因为hadoop-client和log4j依赖在集群中都是有的,打jar的时候不需要打进去。

    <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>3.2.0</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.29</version>
                <scope>provided</scope>
            </dependency>
            <dependency>    
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.29</version>
                <scope>provided</scope>
            </dependency>
    

    之后使用编译命令打包或idea中点击install

    mvn clean package -DskipTests
    

    随后将jar上传到集群中。
    创建hello.txt文件

    [root@bigdata01 ~]# vi hello.txt
    hello you
    hello me
    

    接下来就可以向集群中提交MapReduce任务了。
    具体命令如下:

    hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /test/hello.txt /out
    
    • hadoop:表示使用hadoop脚本提交任务,其实这里也可以使用yarn,从hadoop2.x开始支持yarn,不过也兼容hadoop1
    • jar:表示执行jar包
    • db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar:表示具体的jar包路径
    • com.imooc.mr.WordCountJob:表示要执行的mapreduce代码的全路径
    • /test/hello.txt:指定mapreduce接收到的第一个参数,代表的是输入路径。
    • /out:指定mapreduce接收到的第二个参数,代表的是输出目录,这里的输出目录必须是不存在的,MapReduce程序在执行之前会检测这个输出目录,如果存在会报错,因为每次执行任务都需要创建一个新的输出目录来存储结果数据。

    查看任务执行状态

    2021-04-03 09:56:53,269 INFO mapreduce.Job:  map 0% reduce 0%
    2021-04-03 09:56:59,409 INFO mapreduce.Job:  map 100% reduce 0%
    2021-04-03 09:57:05,462 INFO mapreduce.Job:  map 100% reduce 100%
    2021-04-03 09:57:06,488 INFO mapreduce.Job: Job job_1617414772389_0001 completed successfully
    

    也可以在web界面上查看执行情况。
    访问地址:http://bigdata01:8088

    查看执行结果

    [root@bigdata01 ~]# hdfs dfs -ls /out/
    Found 2 items
    -rw-r--r--   1 root supergroup          0 2021-04-03 09:57 /out/_SUCCESS
    -rw-r--r--   1 root supergroup         19 2021-04-03 09:57 /out/part-r-00000
    

    _SUCCESS文件是一个标记文件,代表该任务执行成功。
    此外,part后面的-r表示这个结果文件是由reduce步骤产生的,如果一个mapreduce只有map阶段没有reduce阶段,那么产生的结果文件是part-m-xxxx

    [root@bigdata01 ~]# hdfs dfs -cat /out/part-r-00000
    hello   2
    me      1
    you     1
    
  • 相关阅读:
    CodeForces 660D Number of Parallelograms
    【POJ 1082】 Calendar Game
    【POJ 2352】 Stars
    【POJ 2481】 Cows
    【POJ 1733】 Parity Game
    【NOI 2002】 银河英雄传说
    【NOI 2015】 程序自动分析
    【POJ 1704】 Georgia and Bob
    【HDU 2176】 取(m堆)石子游戏
    【SDOI 2016】 排列计数
  • 原文地址:https://www.cnblogs.com/shine-rainbow/p/wordcountcount.html
Copyright © 2011-2022 走看看