zoukankan      html  css  js  c++  java
  • Hadoop—MapReduce计算气象温度

    Hadoop—MapReduce计算气象温度

    1 运行环境说明

    1.1 硬软件环境

    • 主机操作系统:Mac OS 64 bit ,8G内存
    • 虚拟软件:Parallers Desktop12
    • 虚拟机操作系统:CentOS 64位,单核,512内存
    • JDK:java version "1.7.0_45"
    • Hadoop:1.1.2

    1.2 机器网络环境

    集群包含三个节点:1个namenode、2个datanode,其中节点之间可以相互ping通。节点IP地址和主机名分布如下:

    序号 IP地址 机器名 类型 用户名 运行进程
    1 192.168.33.200 Master 名称节点 haha NN、SNN、JobTracer
    2 192.168.33.201 Slave1 数据节点 haha DN、TaskTracer
    3 192.168.33.202 Slave2 数据节点 haha DN、TaskTracer
    4 192.168.33.203 Slave3 数据节点 haha DN、TaskTracer

    所有节点均是CentOS6.5 64bit系统,防火墙均禁用,所有节点上均创建了一个haha用户,用户主目录是/home/haha。

    2 使用MapReduce求每年最低温度

    2.1 内容

    下载气象数据集部分数据,写一个Map-Reduce作业,求每年的最低温度,部署并运行之.

    分析Map-Reduce过程

    Map-Reduce编程模型

    2.1.1 Map-reduce的思想就是“分而治之”

    • Mapper

      Mapper负责“分”,即把复杂的任务分解为若干个“简单的任务”执行
      “简单的任务”有几个含义:

      • 1 数据戒计算规模相对于原任务要大大缩小;
      • 2 就近计算 ,即会被分配到存放了所需数据的节点进行计算;
      • 3 这些小任务可以幵行计算,彼此间几乎没有依赖关系
    • Reducer

      对map阶段的结果进行汇总

      • Reducer的数目由mapred-site.xml配置文件里的项目mapred.reduce.tasks决定。缺 省值为1,用户可以覆盖之

    2.2 运行代码

    2.2.1 MinTemperature

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     
    publicclass MinTemperature {
       
        public staticvoid main(String[] args) throws Exception {
            if(args.length != 2) {
                System.err.println("Usage: MinTemperature<input path> <output path>");
                System.exit(-1);
            }
           
            Job job = new Job();
            job.setJarByClass(MinTemperature.class);
            job.setJobName("Min temperature");
            //new Path(args[0])控制台的第一个参数--输入路径
            FileInputFormat.addInputPath(job, new Path(args[0]));
            //new Path(args[1])控制台的第二个参数--输出路径
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            //指定Mapper是哪个类
            job.setMapperClass(MinTemperatureMapper.class);
            //指定Reducer是哪个类
            job.setReducerClass(MinTemperatureReducer.class);
            //指定输出的key和value是什么
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    
    

    2.2.2 MinTemperatureMapper

    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
     
    public class MinTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
     
        private static final int MISSING = 9999;
       
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
           
            String line = value.toString();
            String year = line.substring(15, 19);
           
            int airTemperature;
            if(line.charAt(87) == '+') {
                airTemperature = Integer.parseInt(line.substring(88, 92));
            } else {
                airTemperature = Integer.parseInt(line.substring(87, 92));
            }
           
            String quality = line.substring(92, 93);
            if(airTemperature != MISSING && quality.matches("[01459]")) {
                context.write(new Text(year), new IntWritable(airTemperature));
            }
        }
    }
    

    2.2.3 MinTemperatureReducer

    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
     
    public class MinTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
     
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
           
            int minValue = Integer.MAX_VALUE;
            for(IntWritable value : values) {
                minValue = Math.min(minValue, value.get());
            }
            context.write(key, new IntWritable(minValue));
        }
    }
    

    2.3 实现过程

    2.3.1 编写代码

    进入/home/haha/hadoop-1.1.2/myclass目录,在该目录中建立MinTemperature.JavaMinTemperatureMapper.javaMinTemperatureReducer.java代码文件,代码内容为2.2所示,执行命令如下:

    [haha@Master ~]$cd /home/haha/hadoop-1.1.2/myclass/
    [haha@Master myclass]$vi MinTemperature.java
    [haha@Master myclass]$vi MinTemperatureMapper.java
    [haha@Master myclass]$vi MinTemperatureReducer.java
    

    MinTemperature.java

    MinTemperatureMapper.java

    MinTemperatureReducer.java

    2.3.2编译代码

    在/home/haha/hadoop-1.1.2/myclass目录中,使用如下命令对java代码进行编译,为保证编译成功,加入classpath变量,引入hadoop-core-1.1.2.jar包:

    [haha@Master myclass]$javac -classpath ../hadoop-core-1.1.2.jar *.java
    [haha@Master myclass]$ls
    [haha@Master myclass]$mv *.jar
    [haha@Master myclass]$rm *.class
    

    2.3.4创建目录

    进入/home/haha/hadoop-1.1.2/bin目录,在HDFS中创建气象数据存放路径/user/haha/in,执行命令如下:

    cd /home/haha/hadoop-1.1.2/bin
    hadoop fs -mkdir /user/haha/in
    hadoop fs -ls /user/haha
    

    2.3.5解压气象数据并上传到HDFS中

    使用SSH工具或者scp命令把从NCDC下载的气象数据上传到上步骤创建的目录/user/haha/in中。

    使用zcat命令把这些数据文件解压并合并到一个sample.txt文件中,合并后把这个文件上传到HDFS文件系统的/usr/hadoop/in目录中:

    cd /user/haha/hadoop-1.1.2/in
    zcat *.gz > sample.txt
    hadoop fs -copyFromLocal sample.txt /user/haha/in
    

    气象数据具体的下载地址为 ftp://ftp3.ncdc.noaa.gov/pub/data/noaa/ ,该数据包括1900年到现在所有年份的气象数据,大小大概有70多个G。为了测试简单,我们这里选取一部分的数据进行测试

    2.3.6 运行程序

    以jar的方式启动MapReduce任务,执行输出目录为/user/haha/outputFile:

    
    cd /home/haha/hadoop-1.1.2
    hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/sample.txt  outputFile
    

    2.3.7查看结果

    执行成功后,查看/user/haha/outputFile目录中是否存在运行结果,使用cat查看结果:

    [haha@Master ~]$ hadoop fs -ls /user/haha/outputFile
    [haha@Master ~]$ hadoop fs -cat /user/haha/outputFile/part-r-00000
    [haha@Master ~]$ hadoop fs -cat /user/haha/outputFile/part-r-00000
    1972	11
    

    2.3.8通过页面结果

    1. 查看jobtracker.jsp

    http://master:50030/jobtracker.jsp


    已经完成的作业任务:

    任务的详细信息:

    2.查看dfshealth.jsp

    http://master:50070/dfshealth.jsp

    分别查看HDFS文件系统和日志

    3 求温度平均值能使用combiner吗?

    Q:如果求温度的平均值,能使用combiner吗?有没有变通的方法.

    A:不能使用,因为求平均值和前面求最值存在差异,各局部最值的最值还是等于整体的最值的,但是对于平均值而言,各局部平均值的平均值将不再是整体的平均值了,所以不能用combiner。可以通过变通的办法使用combiner来计算平均值,即在combiner的键值对中不直接存储最后的平均值,而是存储所有值的和个数,最后在reducer输出时再用和除以个数得到平均值。

    3.1 程序代码

    AvgTemperature.java

    import org.apache.hadoop.fs.Path;  
    import org.apache.hadoop.io.IntWritable;  
    import org.apache.hadoop.io.Text;  
    import org.apache.hadoop.mapreduce.Job;  
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
       
    public class AvgTemperature {  
         
        public static void main(String[] args) throws Exception {  
               
            if(args.length != 2) {  
                System.out.println("Usage: AvgTemperatrue <input path><output path>");  
                System.exit(-1);  
            }  
             
            Job job = new Job();  
            job.setJarByClass(AvgTemperature.class);  
            job.setJobName("Avg Temperature");  
            FileInputFormat.addInputPath(job, new Path(args[0]));  
            FileOutputFormat.setOutputPath(job, new Path(args[1]));  
             
            job.setMapperClass(AvgTemperatureMapper.class);  
            job.setCombinerClass(AvgTemperatureCombiner.class);  
            job.setReducerClass(AvgTemperatureReducer.class);  
             
            job.setMapOutputKeyClass(Text.class);  
            job.setMapOutputValueClass(Text.class);  
             
            job.setOutputKeyClass(Text.class);  
            job.setOutputValueClass(IntWritable.class);  
             
            System.exit(job.waitForCompletion(true) ? 0 : 1);  
        }  
    }  
    

    AvgTemperatureMapper.java

    import java.io.IOException;  
    import org.apache.hadoop.io.IntWritable;  
    import org.apache.hadoop.io.LongWritable;  
    import org.apache.hadoop.io.Text;  
    import org.apache.hadoop.mapreduce.Mapper;  
       
    publicclass AvgTemperatureMapper extends Mapper<LongWritable, Text, Text, Text> {  
       
        private static final int MISSING = 9999;  
         
        @Override  
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{  
             
            String line = value.toString();  
            String year = line.substring(15, 19);  
             
            int airTemperature;  
            if(line.charAt(87) == '+') {  
                airTemperature = Integer.parseInt(line.substring(88, 92));  
            } else {  
                airTemperature =  Integer.parseInt(line.substring(87, 92));  
            }  
             
            String quality = line.substring(92, 93);  
            if(airTemperature != MISSING && !quality.matches("[01459]")) {  
                context.write(new Text(year), new Text(String.valueOf(airTemperature)));  
            }  
        }  
    }  
    

    AvgTemperatureCombiner.java

    import java.io.IOException;  
    import org.apache.hadoop.io.Text;  
    import org.apache.hadoop.mapreduce.Reducer;  
       
    public class AvgTemperatureCombiner extends Reducer<Text, Text, Text, Text>{  
       
        @Override  
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
             
            double sumValue = 0;  
            long numValue = 0;  
             
            for(Text value : values) {  
                sumValue += Double.parseDouble(value.toString());  
                numValue ++;  
            }  
             
            context.write(key, new Text(String.valueOf(sumValue) + ',' + String.valueOf(numValue)));  
        }  
    }  
    

    AvgTemperatureReducer.java

    import java.io.IOException;  
    import org.apache.hadoop.io.IntWritable;  
    import org.apache.hadoop.io.Text;  
    import org.apache.hadoop.mapreduce.Reducer;  
       
    public class AvgTemperatureReducer extends Reducer<Text, Text, Text, IntWritable>{  
       
        @Override  
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
             
            double sumValue = 0;  
            long numValue = 0;  
            int avgValue = 0;  
             
            for(Text value : values) {  
                String[] valueAll = value.toString().split(",");  
                sumValue += Double.parseDouble(valueAll[0]);  
                numValue += Integer.parseInt(valueAll[1]);  
            }  
             
            avgValue  = (int)(sumValue/numValue);  
            context.write(key, new IntWritable(avgValue));  
        }  
    }  
    

    3.2 实现过程

    编写代码

    进入/home/haha/hadoop-1.1.2/myclass目录,在该目录中建立AvgTemperature.java、AvgTemperatureMapper.java、AvgTemperatureCombiner.java和AvgTemperatureReducer.java代码文件,执行命令如下:

    cd /usr/local/hadoop-1.1.2/myclass/
    vi AvgTemperature.java
    vi AvgTemperatureMapper.java
    vi AvgTemperatureCombiner.java
    vi AvgTemperatureReducer.java
    

    编译代码

    在/home/user/hadoop-1.1.2/myclass目录中,使用如下命令对java代码进行编译,为保证编译成功,加入classpath变量,引入hadoop-core-1.1.2.jar包:

    javac -classpath ../hadoop-core-1.1.2.jar *.java
    ls
    

    打包编译文件

    把编译好class文件打包,否则在执行过程会发生错误。把打好的包移动到上级目录并删除编译好的class文件:

    jar cvf ./AvgTemperature.jar ./*.class
    ls
    mv *.jar ..
    rm *.class
    
    

    运行程序

    数据使用求每年最低温度的气象数据,数据在HDFS位置为/user/haha/in/sample.txt,以jar的方式启动MapReduce任务,执行输出目录为/user/haha/out1:

    cd /home/haha/hadoop-1.1.2
    hadoop jar AvgTemperature.jar AvgTemperature /user/haha/in/sample.txt  /user/haha/out1
    

    查看结果

    执行成功后,查看/user/haha/out1目录中是否存在运行结果,使用cat查看结果:

    hadoop fs -ls /user/haha/out1
    hadoop fs -cat /user/haha/out1/part-r-00000
    
    

  • 相关阅读:
    2015第14周四
    2015第14周三
    2015第14周二
    2015第14周一
    2015第13周日
    2015第13周六
    2015第13周五
    2015第13周四
    2015第13周三
    2015第13周二
  • 原文地址:https://www.cnblogs.com/cmi-sh-love/p/6759016.html
Copyright © 2011-2022 走看看