zoukankan      html  css  js  c++  java
  • 四、MapReduce 基础

    是一个并行计算框架(计算的数据源比较广泛-HDFS、RDBMS、NoSQL),Hadoop的 MR模块充分利用了HDFS中所有数据节点(datanode)所在机器的内存、CUP以及少量磁盘完成对大数据集的分布式计算。MapReduce将计算分为两个阶段:

    1. 通过将一个大的计算任务分割成若干个小任务(计算目标数据集的分割),每一个小任务会分配给所有的计算节点(datanode所在物理机器)完成对局部数据的归类和分析,我们通常把该阶段定义为Map阶段,在Map阶段结束后会在本地系统磁盘存储计算的临时结果;
    2. 当Map阶段所有节点完成对局部数据的归类分析后,MR框架会启动Reduce任务完成对Map阶段的局部计算临时结果汇总,把以上阶段成为Reduce阶段。

    I、计算流程

    II、YARN环境搭建

    配置文件

    [root@CentOS ~]# vi /usr/hadoop-2.6.0/etc/hadoop/yarn-site.xml

    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <!--Resource Manager-->
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>CentOS</value>
    </property> 
    

    [root@CentOS ~]# mv /usr/hadoop-2.6.0/etc/hadoop/mapred-site.xml.template /usr/hadoop-2.6.0/etc/hadoop/mapred-site.xml

    [root@CentOS ~]# vi /usr/hadoop-2.6.0/etc/hadoop/mapred-site.xml

    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
    

    启动计算服务

    [root@CentOS ~]# start-yarn.sh 
    [root@CentOS ~]# jps
    1584 SecondaryNameNode
    1364 NameNode
    1446 DataNode
    5229 Jps
    

    访问:http://centos:8088/

    III、HelloWorld of MapReduce 编程

      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-mapreduce-client-core</artifactId>
          <version>2.6.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
          <version>2.6.0</version>
      </dependency>
    

    IpMapper

     import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * @program: hadoop_01
     * @description:
     * @author: luoht
     * @create: 2019-01-04 16:08
     **/
    
    public class IpMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
    
        /**
         *192.168.0.12 1 001 click 5000 2019-01-04 14:44:00
         * @param key :输入文本行字节偏移量
         * @param value:输入文本行
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] tokens = value.toString().split("");
            String ip = tokens[0];
            context.write(new Text(ip),new IntWritable(1));
        }
    }
    

    IpReducer

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * @program: hadoop_01
     * @description:
     * @author: luoht
     * @create: 2019-01-04 16:13
     **/
    
    public class IpReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
        /**
         *
         * @param key :ip
         * @param values: Int[]{1,1,1,..}
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int total = 0;
            for (IntWritable value : values) {
                total+=value.get();
            }
            context.write(key,new IntWritable(total));
    
        }
    }
    

    封装job

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * @program: hadoop_01
     * @description:
     * @author: luoht
     * @create: 2019-01-04 16:15
     **/
    
    public class CustomJobSubmiter extends Configured implements Tool {
        @Override
        public int run(String[] strings) throws Exception {
            /*1. 封装job 对象*/
            Configuration conf=getConf();
            Job job = Job.getInstance(conf);
            /*2. 设置数据读入和写出的格式*/
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            /*3. 设置处理数据的路径*/
            Path dst = new Path("/tt/test");
            TextOutputFormat.setOutputPath(job,dst);
            /*4. 设置数据计算逻辑*/
            Path src=new Path("/tt/access");
            TextInputFormat.addInputPath(job,src);
            Path dst=new Path("/tt/result");
            TextOutputFormat.setOutputPath(job,dst);
            /*5. 设置Mapper和Reducer输出泛型*/
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            /*6. 提交任务*/
            job.submit();
            return 0;
        }
    
        public static void main(String[] args) throws Exception {
            ToolRunner.run(new CustomJobSubmiter(),args);
        }
    }
  • 相关阅读:
    ES6 Syntax and Feature Overview
    Javescript——数据类型
    Javescript——变量声明的区别
    React之概述(待续)
    React之简介
    Vue.js学习之简介(待续)
    Build Telemetry for Distributed Services之OpenCensus:Tracing2(待续)
    docker之redis使用
    Build Telemetry for Distributed Services之OpenCensus:C#
    Build Telemetry for Distributed Services之Open Telemetry简介
  • 原文地址:https://www.cnblogs.com/adrien/p/10222635.html
Copyright © 2011-2022 走看看