zoukankan      html  css  js  c++  java
  • MapReduce框架结构及代码示例

    一个完整的 mapreduce 程序在分布式运行时有三类实例进程:

    1、MRAppMaster:负责整个程序的过程调度及状态协调

    2、MapTask:负责 map 阶段的整个数据处理流程

    3、ReduceTask:负责 reduce 阶段的整个数据处理流程

     

     

     

    设计构思

        MapReduce 是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop 集群上。

        既然是做计算的框架,那么表现形式就是有个输入(input),MapReduce 操作这个输入(input),通过本身定义好的计算模型,得到一个输出(output)。

        对许多开发者来说,自己完完全全实现一个并行计算程序难度太大,而MapReduce 就是一种简化并行计算的编程模型,降低了开发并行应用的入门门槛。

      Hadoop MapReduce 构思体现在如下的三个方面:

    • 如何对付大数据处理:分而治之

        对相互间不具有计算依赖关系的大数据,实现并行最自然的办法就是采取分而治之的策略。并行计算的第一个重要问题是如何划分计算任务或者计算数据以便对划分的子任务或数据块同时进行计算。不可分拆的计算任务或相互间有依赖关系的数据无法进行并行计算!

    • 构建抽象模型:Map 和 Reduce

        MapReduce 借鉴了函数式语言中的思想,用 Map 和 Reduce 两个函数提供了高层的并行编程抽象模型。

        Map: 对一组数据元素进行某种重复式的处理;

        Reduce: 对 Map 的中间结果进行某种进一步的结果整理。

        MapReduce 中定义了如下的 Map 和 Reduce 两个抽象的编程接口,由用户去编程实现:

          map: (k1; v1) → [(k2; v2)]

          reduce: (k2; [v2]) → [(k3; v3)]

        Map 和 Reduce 为程序员提供了一个清晰的操作接口抽象描述。通过以上两个编程接口,大家可以看出 MapReduce 处理的数据类型是<key,value>键值对

    • 统一构架,隐藏系统层细节

        如何提供统一的计算框架,如果没有统一封装底层细节,那么程序员则需要考虑诸如数据存储、划分、分发、结果收集、错误恢复等诸多细节;为此,MapReduce 设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节。

        MapReduce 最大的亮点在于通过抽象模型和计算框架把需要做什么(whatneed to do)与具体怎么做(how to do)分开了,为程序员提供一个抽象和高层的编程接口和框架。程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码。如何具体完成这个并行计算任务所相关的诸多系统层细节被隐藏起来,交给计算框架去处理:从分布代码的执行,到大到数千小到单个节点集群的自动调度使用。

     

    代码示例

     

    • pom依赖
    <dependencies>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.7.4</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>2.7.4</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.4</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-core</artifactId>
                <version>2.7.4</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>2.4</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <addClasspath>true</addClasspath>
                                <classpathPrefix>lib/</classpathPrefix>
                                <mainClass>hadoop.mr.wc.WordCountDriver</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    • Mapper类
    /**
     * mr程序执行的时候mapper阶段运行的类,也就是maptask
     */
    public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
        //该方法为map阶段具体的业务逻辑的实现
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //获取传入的一行内容
            String line = value.toString();
            //按照分隔符切割数据返回数组
            String[] words = line.split(" ");
            //遍历数组
            for (String word : words) {
                //每出现一个单词都标记1
                context.write(new Text(word),new IntWritable(1));
            }
        }
    }
    • Reducer类
    /**
     * mr程序执行的时候reducer阶段运行的类,也就是reducertask
     */
    public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        //该方法为reducer阶段具体的业务逻辑的实现
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            //定义一个变量统计
            int count = 0;
            //遍历所有value所在的迭代器  累加构成该单词最终的总个数
            for (IntWritable value : values) {
                count += value.get();
            }
            //相同key的一组调用reduce完毕,直接输出
            context.write(key,new IntWritable(count));
        }
    }
    • 入口函数
      /**
       * mr程序运行时的主类,除了入口函数之外,还要对mr程序做具体描述
       */
      public class WordCountDriver {
          public static void main(String[] args) throws Exception{
              Configuration conf = new Configuration();
              //指定mr程序使用本地模式模拟一套环境执行mr程序,一般用于本地代码测试
      //        conf.set("mapreduce.framework.name","local");
      
              //通过job方法获得mr程序运行的实例
              Job job = Job.getInstance(conf);
      
              //指定本次mr程序的运行主类
              job.setJarByClass(WordCountDriver.class);
              //指定本次mr程序使用的mapper reduce
              job.setMapperClass(WordCountMapper.class);
              job.setReducerClass(WordCountReducer.class);
      
              //指定本次mr程序map输出的数据类型
              job.setMapOutputKeyClass(Text.class);
              job.setMapOutputValueClass(IntWritable.class);
      
              //指定本次mr程序reduce输出的数据类型,也就是说最终的输出类型
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(IntWritable.class);
      
              //指定本次mr程序待处理数据目录   输出结果存放目录
              FileInputFormat.addInputPath(job,new Path("/wordcount/input"));
              FileOutputFormat.setOutputPath(job,new Path("/wordcount/output"));
      //        FileInputFormat.addInputPath(job,new Path("D:\wordcount\input"));
      //        FileOutputFormat.setOutputPath(job,new Path("D:\wordcount\output"));
      
              //提交本次mr程序
              boolean b = job.waitForCompletion(true);
              System.exit(b ? 0 : 1);//程序执行成功,退出状态码为0,退出程序,否则为1
          }
      }
    • 测试

        入口类中被注释的部分为本地测试方法,也就是在windows指定路径中准备测试数据,直接run执行,

        而另一种方法是将代码打成jar包上传到集群中,在hdfs上指定路径准备数据,使用hadoop命令启动

    hadoop jar wordcount.jar

     

     

     

     

     

     

     

     

  • 相关阅读:
    show processlist 输出ID 和 information_schema.PROCESSLIST 的id,information_schema.innodb_trx的TRX_MYSQL_T
    用 Flask 来写个轻博客 (15) — M(V)C_实现博文页面评论表单
    第十二章 对象(下)
    十大最值得关注的国内大数据分析厂商
    第十二章 对象(上)
    mysql 区间锁 对于没有索引 非唯一索引 唯一索引 各种情况
    insert into select 堵塞update
    监控持有sql和被堵塞的sql
    人生应该有间隔年——北漂18年(75)
    ERROR: transport error 202: connect failed: Connection timed out
  • 原文地址:https://www.cnblogs.com/jifengblog/p/9277253.html
Copyright © 2011-2022 走看看