zoukankan      html  css  js  c++  java
  • 第一个MapReduce的例子

    第一个MapReduce的例子

    Hadoop Guide的第一个MapReduce的例子是处理气象数据的(数据来源ncdc),终于跑通了。总结一下步骤,安装hadoop不在本文中介绍

    1 数据预处理

    1.1 下载数据

    测试数据需要在ncdc的官方ftp上进行下载,年份跨度范围1901到2016,不写个脚本下载,靠手工是行不通的,脚本如下:
    download.sh

    !bin/bash
    for i in {1901..2015}
    do
    wget --execute robots=off -r -np -nH -P./ncdc/ --cut-dirs=4 -R index.html* ftp://ftp.ncdc.noaa.gov/pub/data/gsod/$i/gsod_$i.tar
    done
    

    执行命令[nohup sh download.sh &] 会把所有ftp上gsod_*.tar的压缩包下载下来,下载完之后需要预处理这些数据

    1.2 数据预处理

    每一个tar压缩包包含n个gz压缩包,每一个gz包含一个数据文本,关于ncdc气象数据的每个字段的描述在这里,格式举例如下

    STN--- WBAN   YEARMODA    TEMP       DEWP      SLP        STP       VISIB      WDSP     MXSPD   GUST    MAX     MIN   PRCP   SNDP   FRSHTT
    607450 99999  20100101    56.1 22    33.0 22  1012.4  8   975.5  8    5.6 22    9.1 22   19.0  999.9    63.9    48.2*  0.00G 999.9  000000
    607450 99999  20100102    53.0 23    34.2 23  1019.5  8   982.2  8    5.8 23    6.7 23   12.0  999.9    66.7    39.0   0.00G 999.9  000000
    607450 99999  20100103    50.5 23    34.3 23  1022.2  8   984.6  8    6.2 23    7.7 23   12.0  999.9    64.9    36.5   0.00G 999.9  000000
    607450 99999  20100104    53.0 22    34.5 22  1016.5  8   979.3  8    6.4 22    6.5 22   15.9  999.9    64.9    42.8*  0.00G 999.9  000000
    

    预处理的目标是把压缩包里面的数据按照一个年份一个txt的形式存在,为了Map阶段读书去数据方便,去除第一行的title。处理脚本如下,processh.sh

    #!/bin/bash
    
    for i in {1901..2017}
    do
      tar xf ./ncdc/gsod_$i.tar -C ./ncdc
      gunzip ./ncdc/*.gz
    
      rm -rf ncdc/input_gsod_$i.txt
      touch ncdc/input_gsod_$i.txt
    
      for file in ./ncdc/*.op
      do
        sed -i '1d' $file
        cat $file >> ./ncdc/input_gsod_$i.txt
      done
      rm -rf ./ncdc/*.op
      echo "file gsod_$i has processed "
    done
    

    1.3 Load数据到HDFS上

    创建input目录:

    hdfs dfs -mkdir /ncdc
    

    put数据到hdfs上:

    hdfs dfs -put ./ncdc/*.txt /ncdc/*
    

    检查hdfs上的数据, 如果所有的年份的数据都load到了hdfs上就OK

    hdfs dfs -ls /ncdc
    

    2 MapReduce程序

    2.1 MapReduce程序

    package com.oldtrafford.hadoop;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import java.io.IOException;
    
    public class MaxTemperature {
    
        public static void main (String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            if(args.length!=2){
                System.err.println("usage: maxtemperature <input>");
                System.exit(-1);
            }
    
            Job job = new Job();
            job.setJarByClass(MaxTemperature.class);
            job.setJobName("MaxTemperature");
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            job.setMapperClass(MaxTemperatureMapper.class);
            job.setReducerClass(MaxTemperatureReducer.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            job.waitForCompletion(true);
            System.out.println("Finished");
        }
    
        static class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
            private static final String MISSING = "9999.9";
            @Override
            public void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
                String line = value.toString();
                String year = line.substring(14, 18);
    
                String tempretureStr = line.substring(24,30).trim();
                int temperature = -1;
                if(!MISSING.equals(tempretureStr)){
                    temperature = (int)(Double.parseDouble(tempretureStr)*10);
                    context.write(new Text(year), new IntWritable(temperature));
                }
            }
        }
    
        static class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
            @Override
            public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int maxValue = Integer.MIN_VALUE;
                for(IntWritable value : values) {
                    maxValue = Math.max(maxValue, value.get());
                }
                context.write(key, new IntWritable(maxValue));
            }
        }
    }
    

    2.2 打包代码&&执行

    打包代码
    打包mapreduce的代码用最简单的maven命令,会产生一个jar包。然后把这个jar传输到hadoop机器的上一台机器上。

    执行hadoop

    hadoop jar temperature.jar com.oldtrafford.hadoop.MaxTemperature /ncdc/* /ncdc_output
    

    3 查看执行结果

    查看mapreduce结果

    hdfs dfs -cat /ncdc_output/*
    
  • 相关阅读:
    C语言-if语句
    C语言-表达式
    C语言-基础
    Java for LeetCode 187 Repeated DNA Sequences
    Java for LeetCode 179 Largest Number
    Java for LeetCode 174 Dungeon Game
    Java for LeetCode 173 Binary Search Tree Iterator
    Java for LeetCode 172 Factorial Trailing Zeroes
    Java for LeetCode 171 Excel Sheet Column Number
    Java for LeetCode 169 Majority Element
  • 原文地址:https://www.cnblogs.com/oldtrafford/p/7648631.html
Copyright © 2011-2022 走看看