zoukankan      html  css  js  c++  java
  • 第一个MapReduce的例子

    第一个MapReduce的例子

    Hadoop Guide的第一个MapReduce的例子是处理气象数据的(数据来源ncdc),终于跑通了。总结一下步骤,安装hadoop不在本文中介绍

    1 数据预处理

    1.1 下载数据

    测试数据需要在ncdc的官方ftp上进行下载,年份跨度范围1901到2016,不写个脚本下载,靠手工是行不通的,脚本如下:
    download.sh

    !bin/bash
    for i in {1901..2015}
    do
    wget --execute robots=off -r -np -nH -P./ncdc/ --cut-dirs=4 -R index.html* ftp://ftp.ncdc.noaa.gov/pub/data/gsod/$i/gsod_$i.tar
    done
    

    执行命令[nohup sh download.sh &] 会把所有ftp上gsod_*.tar的压缩包下载下来,下载完之后需要预处理这些数据

    1.2 数据预处理

    每一个tar压缩包包含n个gz压缩包,每一个gz包含一个数据文本,关于ncdc气象数据的每个字段的描述在这里,格式举例如下

    STN--- WBAN   YEARMODA    TEMP       DEWP      SLP        STP       VISIB      WDSP     MXSPD   GUST    MAX     MIN   PRCP   SNDP   FRSHTT
    607450 99999  20100101    56.1 22    33.0 22  1012.4  8   975.5  8    5.6 22    9.1 22   19.0  999.9    63.9    48.2*  0.00G 999.9  000000
    607450 99999  20100102    53.0 23    34.2 23  1019.5  8   982.2  8    5.8 23    6.7 23   12.0  999.9    66.7    39.0   0.00G 999.9  000000
    607450 99999  20100103    50.5 23    34.3 23  1022.2  8   984.6  8    6.2 23    7.7 23   12.0  999.9    64.9    36.5   0.00G 999.9  000000
    607450 99999  20100104    53.0 22    34.5 22  1016.5  8   979.3  8    6.4 22    6.5 22   15.9  999.9    64.9    42.8*  0.00G 999.9  000000
    

    预处理的目标是把压缩包里面的数据按照一个年份一个txt的形式存在,为了Map阶段读书去数据方便,去除第一行的title。处理脚本如下,processh.sh

    #!/bin/bash
    
    for i in {1901..2017}
    do
      tar xf ./ncdc/gsod_$i.tar -C ./ncdc
      gunzip ./ncdc/*.gz
    
      rm -rf ncdc/input_gsod_$i.txt
      touch ncdc/input_gsod_$i.txt
    
      for file in ./ncdc/*.op
      do
        sed -i '1d' $file
        cat $file >> ./ncdc/input_gsod_$i.txt
      done
      rm -rf ./ncdc/*.op
      echo "file gsod_$i has processed "
    done
    

    1.3 Load数据到HDFS上

    创建input目录:

    hdfs dfs -mkdir /ncdc
    

    put数据到hdfs上:

    hdfs dfs -put ./ncdc/*.txt /ncdc/*
    

    检查hdfs上的数据, 如果所有的年份的数据都load到了hdfs上就OK

    hdfs dfs -ls /ncdc
    

    2 MapReduce程序

    2.1 MapReduce程序

    package com.oldtrafford.hadoop;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import java.io.IOException;
    
    public class MaxTemperature {
    
        public static void main (String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            if(args.length!=2){
                System.err.println("usage: maxtemperature <input>");
                System.exit(-1);
            }
    
            Job job = new Job();
            job.setJarByClass(MaxTemperature.class);
            job.setJobName("MaxTemperature");
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            job.setMapperClass(MaxTemperatureMapper.class);
            job.setReducerClass(MaxTemperatureReducer.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            job.waitForCompletion(true);
            System.out.println("Finished");
        }
    
        static class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
            private static final String MISSING = "9999.9";
            @Override
            public void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
                String line = value.toString();
                String year = line.substring(14, 18);
    
                String tempretureStr = line.substring(24,30).trim();
                int temperature = -1;
                if(!MISSING.equals(tempretureStr)){
                    temperature = (int)(Double.parseDouble(tempretureStr)*10);
                    context.write(new Text(year), new IntWritable(temperature));
                }
            }
        }
    
        static class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
            @Override
            public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int maxValue = Integer.MIN_VALUE;
                for(IntWritable value : values) {
                    maxValue = Math.max(maxValue, value.get());
                }
                context.write(key, new IntWritable(maxValue));
            }
        }
    }
    

    2.2 打包代码&&执行

    打包代码
    打包mapreduce的代码用最简单的maven命令,会产生一个jar包。然后把这个jar传输到hadoop机器的上一台机器上。

    执行hadoop

    hadoop jar temperature.jar com.oldtrafford.hadoop.MaxTemperature /ncdc/* /ncdc_output
    

    3 查看执行结果

    查看mapreduce结果

    hdfs dfs -cat /ncdc_output/*
    
  • 相关阅读:
    JavaScript:Number 对象
    JavaScript:Math 对象
    杂项:引用资源列表
    小团队管理与大团队管理
    技术转管理
    【翻译+整理】.NET Core的介绍
    自己开发给自己用的个人知识管理工具【脑细胞】,源码提供
    关于【自证清白】
    这篇博客能让你戒烟——用程序员的思维来戒烟!
    如果我是博客园的产品经理【下】
  • 原文地址:https://www.cnblogs.com/oldtrafford/p/7648631.html
Copyright © 2011-2022 走看看