zoukankan      html  css  js  c++  java
  • Hadoop框架:MapReduce基本原理和入门案例

    本文源码:GitHub·点这里 || GitEE·点这里

    一、MapReduce概述

    1、基本概念

    Hadoop核心组件之一:分布式计算的方案MapReduce,是一种编程模型,用于大规模数据集的并行运算,其中Map(映射)和Reduce(归约)。

    MapReduce既是一个编程模型,也是一个计算组件,处理的过程分为两个阶段,Map阶段:负责把任务分解为多个小任务,Reduce负责把多个小任务的处理结果进行汇总。其中Map阶段主要输入是一对Key-Value,经过map计算后输出一对Key-Value值;然后将相同Key合并,形成Key-Value集合;再将这个Key-Value集合转入Reduce阶段,经过计算输出最终Key-Value结果集。

    2、特点描述

    MapReduce可以实现基于上千台服务器并发工作,提供很强大的数据处理能力,如果其中单台服务挂掉,计算任务会自动转义到另外节点执行,保证高容错性;但是MapReduce不适应于实时计算与流式计算,计算的数据是静态的。

    二、操作案例

    1、流程描述

    数据文件一般以CSV格式居多,数据行通常以空格分隔,这里需要考虑数据内容特点;

    文件经过切片分配在不同的MapTask任务中并发执行;

    MapTask任务执行完毕之后,执行ReduceTask任务,依赖Map阶段的数据;

    ReduceTask任务执行完毕后,输出文件结果。

    2、基础配置

    hadoop:
      # 读取的文件源
      inputPath: hdfs://hop01:9000/hopdir/javaNew.txt
      # 该路径必须是程序运行前不存在的
      outputPath: /wordOut
    

    3、Mapper程序

    public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        Text mapKey = new Text();
        IntWritable mapValue = new IntWritable(1);
    
        @Override
        protected void map (LongWritable key, Text value, Context context)
                            throws IOException, InterruptedException {
            // 1、读取行
            String line = value.toString();
            // 2、行内容切割,根据文件中分隔符
            String[] words = line.split(" ");
            // 3、存储
            for (String word : words) {
                mapKey.set(word);
                context.write(mapKey, mapValue);
            }
        }
    }
    

    4、Reducer程序

    public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
        int sum ;
        IntWritable value = new IntWritable();
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,Context context)
                            throws IOException, InterruptedException {
            // 1、累加求和统计
            sum = 0;
            for (IntWritable count : values) {
                sum += count.get();
            }
            // 2、输出结果
            value.set(sum);
            context.write(key,value);
        }
    }
    

    5、执行程序

    @RestController
    public class WordWeb {
    
        @Resource
        private MapReduceConfig mapReduceConfig ;
    
        @GetMapping("/getWord")
        public String getWord () throws IOException, ClassNotFoundException, InterruptedException {
            // 声明配置
            Configuration hadoopConfig = new Configuration();
            hadoopConfig.set("fs.hdfs.impl",
                    org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
            );
            hadoopConfig.set("fs.file.impl",
                    org.apache.hadoop.fs.LocalFileSystem.class.getName()
            );
            Job job = Job.getInstance(hadoopConfig);
    
            // Job执行作业 输入路径
            FileInputFormat.addInputPath(job, new Path(mapReduceConfig.getInputPath()));
            // Job执行作业 输出路径
            FileOutputFormat.setOutputPath(job, new Path(mapReduceConfig.getOutputPath()));
    
            // 自定义 Mapper和Reducer 两个阶段的任务处理类
            job.setMapperClass(WordMapper.class);
            job.setReducerClass(WordReducer.class);
    
            // 设置输出结果的Key和Value的类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            //执行Job直到完成
            job.waitForCompletion(true);
            return "success" ;
        }
    }
    

    6、执行结果查看

    将应用程序打包放到hop01服务上执行;

    java -jar map-reduce-case01.jar
    

    三、案例分析

    1、数据类型

    Java数据类型与对应的Hadoop数据序列化类型;

    Java类型 Writable类型 Java类型 Writable类型
    String Text float FloatWritable
    int IntWritable long LongWritable
    boolean BooleanWritable double DoubleWritable
    byte ByteWritable array DoubleWritable
    map MapWritable

    2、核心模块

    Mapper模块:处理输入的数据,业务逻辑在map()方法中完成,输出的数据也是KV格式;

    Reducer模块:处理Map程序输出的KV数据,业务逻辑在reduce()方法中;

    Driver模块:将程序提交到yarn进行调度,提交封装了运行参数的job对象;

    四、序列化操作

    1、序列化简介

    序列化:将内存中对象转换为二进制的字节序列,可以通过输出流持久化存储或者网络传输;

    反序列化:接收输入字节流或者读取磁盘持久化的数据,加载到内存的对象过程;

    Hadoop序列化相关接口:Writable实现的序列化机制、Comparable管理Key的排序问题;

    2、案例实现

    案例描述:读取文件,并对文件相同的行做数据累加计算,输出计算结果;该案例演示在本地执行,不把Jar包上传的hadoop服务器,驱动配置一致。

    实体对象属性

    public class AddEntity implements Writable {
    
        private long addNum01;
        private long addNum02;
        private long resNum;
    
        // 构造方法
        public AddEntity() {
            super();
        }
        public AddEntity(long addNum01, long addNum02) {
            super();
            this.addNum01 = addNum01;
            this.addNum02 = addNum02;
            this.resNum = addNum01 + addNum02;
        }
    
        // 序列化
        @Override
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeLong(addNum01);
            dataOutput.writeLong(addNum02);
            dataOutput.writeLong(resNum);
        }
        // 反序列化
        @Override
        public void readFields(DataInput dataInput) throws IOException {
            // 注意:反序列化顺序和写序列化顺序一致
            this.addNum01  = dataInput.readLong();
            this.addNum02 = dataInput.readLong();
            this.resNum = dataInput.readLong();
        }
        // 省略Get和Set方法
    }
    

    Mapper机制

    public class AddMapper extends Mapper<LongWritable, Text, Text, AddEntity> {
    
        Text myKey = new Text();
    
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
    
            // 读取行
            String line = value.toString();
    
            // 行内容切割
            String[] lineArr = line.split(",");
    
            // 内容格式处理
            String lineNum = lineArr[0];
            long addNum01 = Long.parseLong(lineArr[1]);
            long addNum02 = Long.parseLong(lineArr[2]);
    
            myKey.set(lineNum);
            AddEntity myValue = new AddEntity(addNum01,addNum02);
    
            // 输出
            context.write(myKey, myValue);
        }
    }
    

    Reducer机制

    public class AddReducer extends Reducer<Text, AddEntity, Text, AddEntity> {
    
        @Override
        protected void reduce(Text key, Iterable<AddEntity> values, Context context)
                throws IOException, InterruptedException {
    
            long addNum01Sum = 0;
            long addNum02Sum = 0;
    
            // 处理Key相同
            for (AddEntity addEntity : values) {
                addNum01Sum += addEntity.getAddNum01();
                addNum02Sum += addEntity.getAddNum02();
            }
    
            // 最终输出
            AddEntity addRes = new AddEntity(addNum01Sum, addNum02Sum);
            context.write(key, addRes);
        }
    }
    

    案例最终结果:

    五、源代码地址

    GitHub·地址
    https://github.com/cicadasmile/big-data-parent
    GitEE·地址
    https://gitee.com/cicadasmile/big-data-parent
    

    推荐阅读:编程体系整理

    序号 项目名称 GitHub地址 GitEE地址 推荐指数
    01 Java描述设计模式,算法,数据结构 GitHub·点这里 GitEE·点这里 ☆☆☆☆☆
    02 Java基础、并发、面向对象、Web开发 GitHub·点这里 GitEE·点这里 ☆☆☆☆
    03 SpringCloud微服务基础组件案例详解 GitHub·点这里 GitEE·点这里 ☆☆☆
    04 SpringCloud微服务架构实战综合案例 GitHub·点这里 GitEE·点这里 ☆☆☆☆☆
    05 SpringBoot框架基础应用入门到进阶 GitHub·点这里 GitEE·点这里 ☆☆☆☆
    06 SpringBoot框架整合开发常用中间件 GitHub·点这里 GitEE·点这里 ☆☆☆☆☆
    07 数据管理、分布式、架构设计基础案例 GitHub·点这里 GitEE·点这里 ☆☆☆☆☆
    08 大数据系列、存储、组件、计算等框架 GitHub·点这里 GitEE·点这里 ☆☆☆☆☆
  • 相关阅读:
    arduino电子艺术PWM直流电机电调实验
    横坐标轴移动位置
    将不才则三军倾
    Source Insight常用快捷键及注释快捷键设置
    dos2unix批量转换的脚本
    win8: ListView
    win8: Asynchronous Programming in JavaScript with “Promises”
    WindJS 中的$await
    iphone:关于沙盒 存储路径
    win8: hello gril
  • 原文地址:https://www.cnblogs.com/cicada-smile/p/14021923.html
Copyright © 2011-2022 走看看