zoukankan      html  css  js  c++  java
  • hadoop2.x入门:编写mapreduce对气象数据集求每日最高气温和最低气温

    1.下载气象数据集

    气象数据集下载地址为:

    我们下载国内的气象数据,使用下面命令进行下载

    wget -D --accept-regex=REGEX -P data -r -c ftp://ftp.ncdc.noaa.gov/pub/data/noaa/2017/5*
    

    国内气象站ID区间为50001-59998详细的可以在《1951—2007年中国地面气候资料日值数据集台站信息》中查看,不过应该不全。另外《StationIDs_Global_1509》中提供了世界各国气象站编号范围。


    2.解压数据集,并保存在文本文件中

    7月23号下载的,数据量为79w行,大小为182MB。所以即使年底也不过200w行。

    [grid@tiny01 ~]$ zcat data/ftp.ncdc.noaa.gov/pub/data/noaa/2017/5*.gz > data.txt
    

    在这里>表示输出重定向符

    我们查看气象数据集:

    0169501360999992017010100004+52130+122520FM-12+043399999V0202201N0010102600199003700199-03271-03631102641ADDAA124160092AJ199999999999999GA1081+026001101GA2999+999999101GA3999+999999101GE19MSL   +99999+99999GF108991081999026001999999MA1999999096571MD1210041+0301REMSYN004BUFR
    

    对数据格式进行解释

    1-4     0169    
    5-10    501360      # USAF weather station identifier 
    11-15   99999       # WBAN weather station identifier
    16-23   20170101    # 记录日期
    24-27   0000        # 记录时间
    28      4
    29-34   +52130      # 纬度(1000倍)
    35-41   +122520     # 经度(1000倍)
    42-46   FM-12
    47-51   +0433      # 海拔(米)
    52-56   99999
    57-60   V020
    61-63   220         # 风向
    64      1           # 质量代码
    65      N
    66-69   0010
    70      1
    71-75   02600       # 云高(米)
    76      1
    77      9
    78      9
    79-84   003700      # 能见距离(米)
    85      1
    86      9
    87      9
    88-92   -0327       # 空气温度(摄氏度*10)
    93      1
    94-98   -0363       # 露点温度(摄氏度*10)
    99      1
    100-104 10264       # 大气压力
    105     1
    

    其中第5-10位表示气象站编号:501360(取前五位),查表可得对应的是黑龙江漠河。我们主要分析的是月份:16-21位和空气温度:88-92位的极值关系。


    3. 编写MapReduce程序

    Mapper程序

    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class MaxTemperatureMapper extends
                    Mapper<LongWritable, Text, Text, IntWritable> {
            private static final int MISSING = 9999;
    
            @Override
            public void map(LongWritable key, Text value, Context context)
                            throws IOException, InterruptedException {
                    String line = value.toString();
                    String data = line.substring(15, 21);
                    int airTemperature;
                    if (line.charAt(87) == '+') { // parseInt doesn't like leading plus
                                                                                   // signs
                            airTemperature = Integer.parseInt(line.substring(88, 92));
                    } else {
                            airTemperature = Integer.parseInt(line.substring(87, 92));
                    }
                    String quality = line.substring(92, 93);
                    if (airTemperature != MISSING && quality.matches("[01459]")) {
                            context.write(new Text(data), new IntWritable(airTemperature));
                    }
            }
    }
    

    Reducer程序

    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MaxTemperatureReducer extends
                    Reducer<Text, IntWritable, Text, IntWritable> {
            @Override
            public void reduce(Text key, Iterable<IntWritable> values, Context context)
                            throws IOException, InterruptedException {
                    int maxValue = Integer.MIN_VALUE;
                    for (IntWritable value : values) {
                            maxValue = Math.max(maxValue, value.get());
                    }
                    context.write(key, new IntWritable(maxValue));
            }
    }
    

    M-R job

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    
    public class MaxTemperature extends Configured implements Tool {
    	public static void main(String[] args) throws Exception {
    		if (args.length != 2) {
    			System.err
    					.println("Usage: MaxTemperature <input path> <output path>");
    			System.exit(-1);
    		}
    		Configuration conf = new Configuration();
    		conf.set("mapred.jar", "MaxTemperature.jar");
    		Job job = Job.getInstance(conf);
    		job.setJarByClass(MaxTemperature.class);
    		job.setJobName("Max temperature");
    		FileInputFormat.addInputPath(job, new Path(args[0]));
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
    		job.setMapperClass(MaxTemperatureMapper.class);
    		job.setReducerClass(MaxTemperatureReducer.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    
    	@Override
    	public int run(String[] arg0) throws Exception {
    		// TODO Auto-generated method stub
    		return 0;
    	}
    }
    

    注意设置conf.set("mapred.jar", "MaxTemperature.jar");第二个参数为即将打成的jar包的名称

    4.编译java文件,打成jar包

    此编译命令为:

    [grid@tiny01 myclass]$ javac -classpath $HADOOP_HOME/share/hadoop/common/hadoop-common-2.7.2.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.2.jar:$HADOOP_HOME/share/hadoop/common/lib/commons-cli-1.2.jar *.java
    [grid@tiny01 myclass]$ jar cvf MaxTemperature.jar *.class
    [grid@tiny01 myclass]$ ll
    total 28
    -rw-rw-r--. 1 grid grid 1413 Jul  3 16:45 MaxTemperature.class
    -rw-rw-r--. 1 grid grid 3085 Jul  9 19:04 MaxTemperature.jar
    -rw-rw-r--. 1 grid grid  949 Jun 30 15:49 MaxTemperature.java
    -rw-rw-r--. 1 grid grid 1876 Jul  3 16:45 MaxTemperatureMapper.class
    -rw-rw-r--. 1 grid grid  953 Jun 30 15:37 MaxTemperatureMapper.java
    -rw-rw-r--. 1 grid grid 1687 Jul  3 16:45 MaxTemperatureReducer.class
    -rw-rw-r--. 1 grid grid  553 Jun 30 15:47 MaxTemperatureReducer.java
    
    

    这里的classpath和之前的hadoop版本有所区别,需要按照新的设置方法,这一点网上很少提及!(注意Hadoop不同版本,包不一样)

    5.将数据上传至hdfs上

    [grid@tiny01 ~]$ hadoop fs -put data.txt /data.txt
    

    6. 运行该程序

    [grid@tiny01 ~]$ hadoop jar MaxTemperature.jar MaxTemperature /data.txt /out
    
    17/07/24 00:13:20 INFO client.RMProxy: Connecting to ResourceManager at tiny01/192.168.1.101:8032
    17/07/24 00:13:21 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
    17/07/24 00:13:22 INFO input.FileInputFormat: Total input paths to process : 1
    17/07/24 00:13:23 INFO mapreduce.JobSubmitter: number of splits:2
    17/07/24 00:13:23 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1500807860144_0002
    17/07/24 00:13:24 INFO impl.YarnClientImpl: Submitted application application_1500807860144_0002
    17/07/24 00:13:24 INFO mapreduce.Job: The url to track the job: http://tiny01:8088/proxy/application_1500807860144_0002/
    17/07/24 00:13:24 INFO mapreduce.Job: Running job: job_1500807860144_0002
    17/07/24 00:13:44 INFO mapreduce.Job: Job job_1500807860144_0002 running in uber mode : false
    17/07/24 00:13:44 INFO mapreduce.Job:  map 0% reduce 0%
    17/07/24 00:14:49 INFO mapreduce.Job:  map 20% reduce 0%
    17/07/24 00:14:52 INFO mapreduce.Job:  map 33% reduce 0%
    17/07/24 00:14:55 INFO mapreduce.Job:  map 50% reduce 0%
    17/07/24 00:16:02 INFO mapreduce.Job:  map 51% reduce 0%
    17/07/24 00:16:05 INFO mapreduce.Job:  map 54% reduce 0%
    17/07/24 00:16:08 INFO mapreduce.Job:  map 57% reduce 0%
    17/07/24 00:16:11 INFO mapreduce.Job:  map 60% reduce 0%
    17/07/24 00:16:14 INFO mapreduce.Job:  map 62% reduce 0%
    17/07/24 00:16:20 INFO mapreduce.Job:  map 65% reduce 0%
    17/07/24 00:16:40 INFO mapreduce.Job:  map 69% reduce 0%
    17/07/24 00:16:42 INFO mapreduce.Job:  map 73% reduce 0%
    17/07/24 00:16:44 INFO mapreduce.Job:  map 83% reduce 0%
    17/07/24 00:16:46 INFO mapreduce.Job:  map 100% reduce 0%
    17/07/24 00:17:22 INFO mapreduce.Job:  map 100% reduce 100%
    17/07/24 00:17:30 INFO mapreduce.Job: Job job_1500807860144_0002 completed successfully
    17/07/24 00:17:32 INFO mapreduce.Job: Counters: 50
            File System Counters
                    FILE: Number of bytes read=10226664
                    FILE: Number of bytes written=20805407
                    FILE: Number of read operations=0
                    FILE: Number of large read operations=0
                    FILE: Number of write operations=0
                    HDFS: Number of bytes read=190690631
                    HDFS: Number of bytes written=77
                    HDFS: Number of read operations=9
                    HDFS: Number of large read operations=0
                    HDFS: Number of write operations=2
            Job Counters
                    Killed map tasks=1
                    Launched map tasks=3
                    Launched reduce tasks=1
                    Data-local map tasks=3
                    Total time spent by all maps in occupied slots (ms)=383699
                    Total time spent by all reduces in occupied slots (ms)=143422
                    Total time spent by all map tasks (ms)=383699
                    Total time spent by all reduce tasks (ms)=143422
                    Total vcore-milliseconds taken by all map tasks=383699
                    Total vcore-milliseconds taken by all reduce tasks=143422
                    Total megabyte-milliseconds taken by all map tasks=392907776
                    Total megabyte-milliseconds taken by all reduce tasks=146864128
            Map-Reduce Framework
                    Map input records=789998
                    Map output records=786666
                    Map output bytes=8653326
                    Map output materialized bytes=10226670
                    Input split bytes=184
                    Combine input records=0
                    Combine output records=0
                    Reduce input groups=7
                    Reduce shuffle bytes=10226670
                    Reduce input records=786666
                    Reduce output records=7
                    Spilled Records=1573332
                    Shuffled Maps =2
                    Failed Shuffles=0
                    Merged Map outputs=2
                    GC time elapsed (ms)=2436
                    CPU time spent (ms)=8470
                    Physical memory (bytes) snapshot=415924224
                    Virtual memory (bytes) snapshot=6170849280
                    Total committed heap usage (bytes)=267198464
            Shuffle Errors
                    BAD_ID=0
                    CONNECTION=0
                    IO_ERROR=0
                    WRONG_LENGTH=0
                    WRONG_MAP=0
                    WRONG_REDUCE=0
            File Input Format Counters
                    Bytes Read=190690447
            File Output Format Counters
                    Bytes Written=77
    
    

    查看结果

    [grid@tiny01 ~]$ hadoop fs -cat /out/part-r-00000
    201701  307
    201702  350
    201703  375
    201704  399
    201705  426
    201706  444
    201707  485
    
    

    由于这里的气温是摄氏度的10倍,所以看起来很大。
    我们来检查一下:

    [grid@tiny01 ~]$ hadoop fs -copyToLocal /out/part-r-00000 result.txt
    [grid@tiny01 ~]$ awk '{print $1".{66}+0"$2"1"}' result.txt |xargs -i grep --color=auto {} sample.txt | awk -v FS="" '{print substr($0,5,5),substr($0,16,6),substr($0,88,6)}'
    59158 201704 +03071
    59997 201701 +03071
    56966 201702 +03501
    56966 201703 +03751
    56966 201704 +03991
    51573 201705 +04261
    51573 201706 +04441
    51573 201706 +04441
    51573 201707 +04851
    

    正则表达式不会写,就将就着看吧,第一条是因为正则表达式匹配的问题,因此这条数据不算。但是其他条都吻合,我们可以看看这几个气象站:

    • 51573:新疆吐鲁番
    • 56966:云南元江
    • 59997:没找到

    测试成功!

  • 相关阅读:
    Codeforces Round #654 (Div. 2)A-E1
    android 学习receiver和发送广播,其中监听其他activity的启动demo;给activity加自定义权限只有指定有权限的app可以监听到
    任务栈Task的模式
    Activity生命周期学习笔记,和横竖切屏时候activity销毁时候保存数据和调用的方法
    Activity之间利用intent单个传递数据和批量传递数据
    Android学习-启动服务startActivityForResult调用activity并覆写onActivityResult()接收返回来的信息
    android学习之intent学习笔记
    android断点下载并显示进度,关于handler,和主线程不能联网采取子线程联网下载,和多线程下载学习
    Contentprovider学习笔记
    android学习之LayoutInflater的用法,在myAdapter getView()里将多个TextView组件压缩成一个View控件,并在listView里显示
  • 原文地址:https://www.cnblogs.com/erygreat/p/7225771.html
Copyright © 2011-2022 走看看