zoukankan      html  css  js  c++  java
  • 编写自已的第一个MapReduce程序

     从进入系统学习到现在,貌似我们还没有真正开始动手写程序,估计有些立志成为Hadoop攻城狮的小伙伴们已经有些急了。环境已经搭好,小讲也有些按捺不住了。今天,小讲就和大家一起来动手编写我们的第一个MapReduce程序。

            小讲曾说过,写Hadoop程序,核心就是Mapper类,Reudcer类,run()方法,很多时候照葫芦画瓢就行了,今天我们就照Hadoop程序基础模板这个葫芦来“画个瓢” —— 写个MapReduce程序。

    Hadoop程序模板(葫芦)

    数据源:来自美国成百上千个气象站的气象数据,其中一个气象站的几行示例数据如下:

    1985 07 31 02   200    94 10137   220    26     1     0 -9999
    1985 07 31 03   172    94 10142   240     0     0     0 -9999
    1985 07 31 04   156    83 10148   260    10     0     0 -9999
    1985 07 31 05   133    78 -9999   250     0 -9999     0 -9999
    1985 07 31 06   122    72 -9999    90     0 -9999     0     0
    1985 07 31 07   117    67 -9999    60     0 -9999     0 -9999
    1985 07 31 08   111    61 -9999    90     0 -9999     0 -9999
    1985 07 31 09   111    61 -9999    60     5 -9999     0 -9999
    1985 07 31 10   106    67 -9999    80     0 -9999     0 -9999
    1985 07 31 11   100    56 -9999    50     5 -9999     0 -9999

    功能需求:基于这份数据,统计美国每个气象站30年的平均气温,部分输出结果如下:

    03103    82        //03103代表气象站编号,82代表平均气温(华氏)
    03812    128
    03813    178
    03816    143
    03820    173
    03822    189
    03856    160
    03860    130
    03870    156
    03872    108

    Hadoop模板程序:

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    /**
     * 统计美国各个气象站30年来的平均气温
     */
    public class Temperature extends Configured implements Tool {
    
        public static class TemperatureMapper extends Mapper< LongWritable, Text, Text, IntWritable> {
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                //数据示例:1985 07 31 02  200 94 10137 220 26 1 0 -9999
                String line = value.toString(); //读取每行数据
                int temperature = Integer.parseInt(line.substring(14, 19).trim());//气温值
                if (temperature != -9999) { //过滤无效数据
                    FileSplit fileSplit = (FileSplit) context.getInputSplit();
                    //通过文件名称获取气象站id
                    String weatherStationId = fileSplit.getPath().getName().substring(5, 10);
                    //map 输出
                    context.write(new Text(weatherStationId), new IntWritable(temperature));
                }
            }
        }
    
        public static class TemperatureReducer extends
                Reducer< Text, IntWritable, Text, IntWritable> {
    
            private IntWritable result = new IntWritable();
    
            public void reduce(Text key, Iterable< IntWritable> values,
                    Context context) throws IOException, InterruptedException {
    
                int sum = 0;
                int count = 0;
                //循环values,对统一气象站的所有气温值求和
                for (IntWritable val : values) {
                    sum += val.get();
                    count++;
                }
                //求每个气象站的平均值
                result.set(sum / count);
                //reduce输出  key=weatherStationId  value=mean(temperature)
                context.write(key, result);
            }
        }
    
    
    
        /**
         * @function 任务驱动方法
         * @param args
         * @return
         * @throws Exception
         */
        @Override
        public int run(String[] args) throws Exception {
            // TODO Auto-generated method stub
            Configuration conf = new Configuration();//读取配置文件
    
            Path mypath = new Path(args[1]);
            FileSystem hdfs = mypath.getFileSystem(conf);
            if (hdfs.isDirectory(mypath)) {//删除已经存在的输出目录
                hdfs.delete(mypath, true);
            }
    
            Job job = new Job(conf, "temperature");//新建一个任务
            job.setJarByClass(Temperature.class);// 主类
    
            FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
            FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
    
            job.setMapperClass(TemperatureMapper.class);// Mapper
            job.setReducerClass(TemperatureReducer.class);// Reducer
    
            job.setOutputKeyClass(Text.class);//输出结果的key类型
            job.setOutputValueClass(IntWritable.class);//输出结果的value类型
    
            job.waitForCompletion(true);//提交任务
            return 0;
        }
    
    
        /**
         * @function main 方法
         * @param args
         * @throws Exception
         */
        public static void main(String[] args) throws Exception {
            String[] args0 = {
                    "hdfs://single.hadoop.dajiangtai.com:9000/weather/",
                    "hdfs://single.hadoop.dajiangtai.com:9000/weather/out/"
                    };
            int ec = ToolRunner.run(new Configuration(), new Temperature(), args0);
            System.exit(ec);
        }
    }

     

  • 相关阅读:
    HIHO线段树(成段)
    HIHO 线段树(单点)
    POJ 3468
    HDU 1754
    HDU 1698
    HDU 5119
    HDU 1394
    HDU 1166
    DZY Loves Chessboard
    谷歌Cookies无法写入
  • 原文地址:https://www.cnblogs.com/bob-wzb/p/5145194.html
Copyright © 2011-2022 走看看