zoukankan      html  css  js  c++  java
  • MapReduce实战之MaxTemper实战

    首先上传1901年和1902年的温度数据到CentOs

     

    导出jar包

     

    启动hadoop集群

     

    在hdfs中建立文件夹:hadoop  fs  –mkdir  /user/hadoop/input

     

    上传本地文件到hdfs:hadoop  fs  -put  1901 /user/hadoop/input

     

    检查是否成功上传:hadoop  fs  -ls  /user/hadoop/input

     

    提交任务到hadoop集群:

    hadoop  jar  Temper.jar cn.edu.gznc.MaxTemper  /user/hadoop/input/1901  /user/hadoop/outputTemper

     

    任务运行中

     

    查看运行结果

     

    打印运行结果:最大温度317

     

    计算1902年一样的步骤

     

    运行完毕

     

    运行结果:1902年最大温度是228

     

    这是1901部分数据内容

    源代码:

    package cn.edu.gznc

    import java.io.IOException; 

    import java.text.DateFormat; 

    import java.text.SimpleDateFormat; 

    import java.util.Date; 

    import org.apache.hadoop.conf.Configuration; 

    import org.apache.hadoop.conf.Configured; 

    import org.apache.hadoop.util.Tool; 

    import org.apache.hadoop.fs.Path; 

    import org.apache.hadoop.io.*; 

    import org.apache.hadoop.mapreduce.*; 

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

    import org.apache.hadoop.util.ToolRunner; 

    public class MaxTemper extends Configured implements Tool{

             enum Counter { 

            LINESKIP; 

        } 

        // map job 

        public static class Map extends 

                Mapper<LongWritable, Text, Text, IntWritable> { 

            private static final int MISSING = 9999; 

            public void map(LongWritable key, Text value, Context context) 

                    throws IOException, InterruptedException { 

                try { 

                    String line = value.toString(); 

                    System.out.println(line); 

                    String year = line.substring(15, 19); 

                    int airTemperature; 

                    if (line.charAt(87) == '+') { 

                        airTemperature = Integer.parseInt(line.substring(88, 92)); 

                    } else { 

                        airTemperature = Integer.parseInt(line.substring(87, 92)); 

                    } 

                    String quality = line.substring(92, 93); 

                    if (airTemperature != MISSING && quality.matches("[01459]")) { 

                        context.write(new Text(year), new IntWritable( 

                                airTemperature)); 

                    } 

                } catch (NumberFormatException e) { 

                    // TODO Auto-generated catch block 

                    // e.printStackTrace(); 

                    context.getCounter(Counter.LINESKIP).increment(1); 

                    return; 

                } 

            } 

        } 

        // reduce job 

        public static class Reduce extends 

                Reducer<Text, IntWritable, Text, IntWritable> { 

            public void reduce(Text key, Iterable<IntWritable> values, 

                    Context context) throws IOException, InterruptedException { 

                int maxValue = Integer.MIN_VALUE; 

                for (IntWritable value : values) { 

                    maxValue = Math.max(maxValue, value.get()); 

                } 

                context.write(key, new IntWritable(maxValue)); 

            } 

        } 

        @Override 

        public int run(String[] arg0) throws Exception { 

            Configuration conf = getConf(); 

            Job job = new Job(conf, "MaxTemper");// job name 

            job.setJarByClass(MaxTemper.class);// pointer class 

            FileInputFormat.addInputPath(job, new Path(arg0[0])); 

            FileOutputFormat.setOutputPath(job, new Path(arg0[1])); 

            job.setMapperClass(Map.class); 

            job.setCombinerClass(Reduce.class);// 指定一个合并函数,这里求最大值是可行,读者慎用 

            job.setReducerClass(Reduce.class); 

            job.setOutputFormatClass(TextOutputFormat.class); 

            job.setOutputKeyClass(Text.class); 

            job.setOutputValueClass(IntWritable.class); 

            job.waitForCompletion(true); 

            System.out.println("Job:" + job.getJobName()); 

            System.out.println("Execute stuts:" + job.isSuccessful()); 

            System.out.println("Input lines:" 

                    + job.getCounters().findCounter( 

                            "org.apache.hadoop.mapred.Task$Counter", 

                            "MAP_INPUT_RECORDS").getValue()); 

            System.out.println("Output lines:" 

                    + job.getCounters().findCounter( 

                            "org.apache.hadoop.mapred.Task$Counter", 

                            "MAP_OUTPUT_RECORDS").getValue()); 

            System.out.println("Skip lines:" 

                    + job.getCounters().findCounter(Counter.LINESKIP).getValue()); 

            return job.isSuccessful() ? 0 : 1; 

        } 

        public static void main(String[] args) throws Exception { 

            if (args.length != 2) { 

                System.err.println(""); 

                System.err.println("Usage: Test_2 < input path > < output path > "); 

                System.err.println("Example: hadoop jar ~/MaxTemper.jar hdfs://master:9000/user/test hdfs://master:9000/user/output"); 

                System.err.println("Counter:"); 

                System.err.println(" " + "LINESKIP" + " " 

                        + "Lines which are too short"); 

                System.exit(-1); 

            } 

            DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 

            Date start = new Date(); 

            Configuration config = new Configuration();

            int res = ToolRunner.run(config, new MaxTemper(), args); 

            Date end = new Date(); 

            float time = (float) ((end.getTime() - start.getTime()) / 60000.0); 

            System.out.println("Job start:" + formatter.format(start)); 

            System.out.println("Job finished:" + formatter.format(end)); 

            System.out.println("Job speeds time:" + String.valueOf(time) + " m"); 

            System.exit(res); 

        } 

    }

  • 相关阅读:
    Python语言解析xml文件
    运行manage.py db shell出错
    ImportError: No module named win32com.client
    ImportError: No module named urllib2
    《演讲之禅》 读书笔记
    no such table: django_admin_log
    2 策略模式(2)
    1 简单工厂模式
    2 策略模式(1)
    无言
  • 原文地址:https://www.cnblogs.com/ddhdd/p/8986901.html
Copyright © 2011-2022 走看看