zoukankan      html  css  js  c++  java
  • 学习进度(3)

    时间:2020.10.26~2020.10.30

    学习了MapReduce的内容,了解了其工作机制

    代码量:500行  学习时间:7小时

    将项目提交到MapReduce运行会经常会出现reduce阶段不成功的现象,一定要对数据正确划分才行!!

    运行一个MapReduce程序的主要代码在于确定hdfs中文件的输入输出路径以及实现Mapper接口和Reduce接口来进行数据处理的类

    一个自认为比较典型的例子,是云计算的一次实验

    package com.yunjisuan;
    
    
    import java.io.IOException;
     
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    public class Temperature {
        /**
         * 四个泛型类型分别代表:
         * KeyIn        Mapper的输入数据的Key,这里是每行文字的起始位置(0,11,...)
         * ValueIn      Mapper的输入数据的Value,这里是每行文字
         * KeyOut       Mapper的输出数据的Key,这里是每行文字中的“年份”
         * ValueOut     Mapper的输出数据的Value,这里是每行文字中的“气温”
         */
        static class TempMapper extends
                Mapper<LongWritable, Text, Text, IntWritable> {
            @Override
            public void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                // 打印样本: Before Mapper: 0, 2000010115
                System.out.print("Before Mapper: " + key + ", " + value);
                String line = value.toString();
                String year = line.substring(0, 4);
                int temperature = Integer.parseInt(line.substring(8));
                context.write(new Text(year), new IntWritable(temperature));
                // 打印样本: After Mapper:2000, 15
                System.out.println(
                        "======" +
                        "After Mapper:" + new Text(year) + ", " + new IntWritable(temperature));
            }
        }
     
        /**
         * 四个泛型类型分别代表:
         * KeyIn        Reducer的输入数据的Key,这里是每行文字中的“年份”
         * ValueIn      Reducer的输入数据的Value,这里是每行文字中的“气温”
         * KeyOut       Reducer的输出数据的Key,这里是不重复的“年份”
         * ValueOut     Reducer的输出数据的Value,这里是这一年中的“最高气温”
         */
        static class TempReducer extends
                Reducer<Text, IntWritable, Text, IntWritable> {
            @Override
            public void reduce(Text key, Iterable<IntWritable> values,
                    Context context) throws IOException, InterruptedException {
                int maxValue = Integer.MIN_VALUE;
                StringBuffer sb = new StringBuffer();
                //取values的最大值
                for (IntWritable value : values) {
                    maxValue = Math.max(maxValue, value.get());
                    sb.append(value).append(", ");
                }
                // 打印样本: Before Reduce: 2000, 15, 23, 99, 12, 22, 
                System.out.print("Before Reduce: " + key + ", " + sb.toString());
                context.write(key, new IntWritable(maxValue));
                // 打印样本: After Reduce: 2000, 99
                System.out.println(
                        "======" +
                        "After Reduce: " + key + ", " + maxValue);
            }
        }
     
        public static void main(String[] args) throws Exception {
        
            
            //输入路径
            String dst = "hdfs://hdp-01:9000/intput.txt";
            //输出路径,必须是不存在的,空文件加也不行。
            String dstOut = "hdfs://hdp-01:9000/output";
            Configuration hadoopConfig = new Configuration();
            hadoopConfig.set("fs.defaultFS", "hdfs://hdp-01:9000");
            hadoopConfig.set("fs.hdfs.impl", 
                org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
            );
            hadoopConfig.set("fs.file.impl",
                org.apache.hadoop.fs.LocalFileSystem.class.getName()
            );
            Job job = new Job(hadoopConfig);
             
            //如果需要打成jar运行,需要下面这句
            //job.setJarByClass(NewMaxTemperature.class);
     
            //job执行作业时输入和输出文件的路径
            FileInputFormat.addInputPath(job, new Path(dst));
            FileOutputFormat.setOutputPath(job, new Path(dstOut));
     
            //指定自定义的Mapper和Reducer作为两个阶段的任务处理类
            job.setMapperClass(TempMapper.class);
            job.setReducerClass(TempReducer.class);
             
            //设置最后输出结果的Key和Value的类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
             
            //执行job,直到完成
            job.waitForCompletion(true);
            System.out.println("Finished");
        }
    }
  • 相关阅读:
    操作系统 chapter3 进程线程模型
    操作系统 chapter1 操作系统概述
    操作系统 chapter2 操作系统运行环境
    计算机网络 chapter 9 无线网络
    计算机网络 chapter 10 下一代因特网
    计算机网络 chapter 8 因特网上的音频/视频服务
    汇总常用的jQuery操作Table tr td方法
    jquery判断checkbox是否选中及改变checkbox状态
    $.ajax()方法详解
    wamp设置mysql默认编码
  • 原文地址:https://www.cnblogs.com/ywqtro/p/13935449.html
Copyright © 2011-2022 走看看