zoukankan      html  css  js  c++  java
  • MapReduce(三) 典型场景(一)

    一、mapreduce多job串联

       1、需求   

    一个稍复杂点的处理逻辑往往需要多个 mapreduce 程序串联处理,多 job 的串联可以借助 mapreduce 框架的 JobControl 实现

       2、实例

    以下有两个 MapReduce 任务,分别是 Flow SumMR SortMR,其中有依赖关系: SumMR 的输出是 SortMR 的输入,所以 SortMR 的启动得在 SumMR 完成之后 

    public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job jobsum = Job.getInstance(conf);
    jobsum.setJarByClass(RunManyJobMR.class);
    jobsum.setMapperClass(FlowSumMapper.class);
    jobsum.setReducerClass(FlowSumReducer.class);
    jobsum.setMapOutputKeyClass(Text.class);
    jobsum.setMapOutputValueClass(Flow.class);
    jobsum.setCombinerClass(FlowSumReducer.class);
    jobsum.setOutputKeyClass(Text.class);
    jobsum.setOutputValueClass(Text.class);
    FileInputFormat.setInputPaths(jobsum, "d:/flow/input");
    FileOutputFormat.setOutputPath(jobsum, new Path("d:/flow/output12"));
    Job jobsort = Job.getInstance(conf); jobsort.setJarByClass(RunManyJobMR.class); jobsort.setMapperClass(FlowSortMapper.class); jobsort.setReducerClass(FlowSortReducer.class); jobsort.setMapOutputKeyClass(Flow.class); jobsort.setMapOutputValueClass(Text.class); jobsort.setOutputKeyClass(NullWritable.class); jobsort.setOutputValueClass(Flow.class); FileInputFormat.setInputPaths(jobsort, "d:/flow/output12"); FileOutputFormat.setOutputPath(jobsort, new Path("d:/flow/sortoutput12"));
    ControlledJob sumcj = new ControlledJob(jobsum.getConfiguration()); ControlledJob sortcj = new ControlledJob(jobsort.getConfiguration()); sumcj.setJob(jobsum); sortcj.setJob(jobsort); // 设置作业依赖关系 sortcj.addDependingJob(sumcj); JobControl jc = new JobControl("flow sum and sort"); jc.addJob(sumcj); jc.addJob(sortcj); Thread jobThread = new Thread(jc); jobThread.start(); while(!jc.allFinished()){ Thread.sleep(500); } jc.stop(); }

    二、topn算法实现——自定义GroupComparator

        1、需求

       在统计学生成绩的小项目中,现在有一个需求:
       求出每个班参考学生成绩最高的学生的信息,班级,姓名和平均分

       2、分析

    (1)利用“班级和平均分”作为 key,可以将 map 阶段读取到的所有学生成绩数据按照班级 和成绩排倒序,发送到 reduce
    (2)在 reduce 端利用 GroupingComparator 将班级相同的 kv 聚合成组,然后取第一个即是最 大值
       3、实现

     数据类似于

    computer	huangxiaoming	85	86	41	75	93	42	85
    computer	xuzheng	54	52	86	91	42
    computer	huangbo	85	42	96	38	
    english	zhaobenshan	54	52	86	91	42	85	75
    english	liuyifei	85	41	75	21	85	96	14
    algorithm	liuyifei	75	85	62	48	54	96	15
    computer	huangjiaju	85	75	86	85	85
    english	liuyifei	76	95	86	74	68	74	48
    

      第一步:先把分组和排序字段都综合到一个自定义对象里

    package com.ghgj.mr.topn;
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import org.apache.hadoop.io.WritableComparable;
    public class ClazzScore implements WritableComparable<ClazzScore>{
    private String clazz;
    private Double score;
    public String getClazz() {
    return clazz;
    }
    public void setClazz(String clazz) {
    this.clazz = clazz;
    }
    public Double getScore() {
    return score;
    }
    public void setScore(Double score) {
    this.score = score;
    }
    public ClazzScore(String clazz, Double score) {
    super();
    this.clazz = clazz;
    this.score = score;
    }
    public ClazzScore() {
    super();
    // TODO Auto-generated constructor stub
    }
    @Override
    public String toString() {
    return clazz + "	" + score;
    }
    @Override
    public void write(DataOutput out) throws IOException {
    out.writeUTF(clazz);
    out.writeDouble(score);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
    // TODO Auto-generated method stub
    this.clazz = in.readUTF();
    this.score = in.readDouble();
    }
    /**
    * key 排序
    */
    @Override
    public int compareTo(ClazzScore cs) {
    int it = cs.getClazz().compareTo(this.clazz);
    if(it == 0){
    return (int) (cs.getScore() - this.score);
    }else{
    return it;
    }
    }
    }
    

      第二步:编写排序之后的 ClazzScore 数据传入 ReduceTask 的分组规则

    package com.ghgj.mr.topn;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    public class ClazzScoreGroupComparator extends WritableComparator{
    ClazzScoreGroupComparator(){
    super(ClazzScore.class, true);
    }
    /**
    * 决定输入到 reduce 的数据的分组规则
    */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
    // TODO Auto-generated method stub
    ClazzScore cs1 = (ClazzScore)a;
    ClazzScore cs2 = (ClazzScore)b;
    int it = cs1.getClazz().compareTo(cs2.getClazz());
    return it;
    }
    }
    

      第三步:编写mapreduce程序

    package com.ghgj.mr.topn;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    /**
    * TopN 问题
    */
    public class ScoreTop1MR {
    public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    job.setJarByClass(ScoreTop1MR.class);
    job.setMapperClass(ScoreTop1MRMapper.class);
    job.setReducerClass(ScoreTop1MRReducer.class);
    job.setOutputKeyClass(ClazzScore.class);
    job.setOutputValueClass(DoubleWritable.class);
    // 设置传入 reducer 的数据分组规则
    job.setGroupingComparatorClass(ClazzScoreGroupComparator.class);
    FileInputFormat.setInputPaths(job, "d:/score_all/input");
    Path p = new Path("d:/score_all/output1");
    FileSystem fs = FileSystem.newInstance(conf);
    if(fs.exists(p)){
    fs.delete(p, true);
    }
    FileOutputFormat.setOutputPath(job, p);
    boolean status = job.waitForCompletion(true);
    System.exit(status ? 0 : 1);
    }
    static class ScoreTop1MRMapper extends Mapper<LongWritable, Text, ClazzScore,
    DoubleWritable>{
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException,
    InterruptedException {
    String[] splits = value.toString().split("	");
    ClazzScore cs = new ClazzScore(splits[0], Double.parseDouble(splits[2]));
    context.write(cs, new DoubleWritable(Double.parseDouble(splits[2])));
    }
    }
    static class ScoreTop1MRReducer extends Reducer<ClazzScore, DoubleWritable, ClazzScore,
    DoubleWritable>{
    @Override
    protected void reduce(ClazzScore cs, Iterable<DoubleWritable> scores, Context
    context) throws IOException, InterruptedException {
    // 按照规则,取每组的第一个就是 Top1
    context.write(cs, scores.iterator().next());
    }
    }
    }

     三、Mapreduce全局计数器

         1、需求

    在实际生产代码中,常常需要将数据处理过程中遇到的不合规数据行进行全局计数,类似这 种需求可以借助 MapReduce 框架中提供的全局计数器来实现
         2、实例

    以下是一个利用全局计数器来统计一个目录下所有文件出现的单词总数和总行数

       

    package com.ghgj.mr.counter;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class WordCount {
    	
    	enum MyWordCounter{COUNT_LINES,COUNT_WORD}
    //	enum Weekday{MONDAY, TUESDAY, WENSDAY, THURSDAY, FRIDAY, SATURDAY, SUNDAY}
    
    	public static void main(String[] args) throws Exception {
    		// 指定hdfs相关的参数
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf);
    		// 设置jar包所在路径
    		job.setJarByClass(WordCount.class);
    		job.setMapperClass(WCMapper.class);
    		job.setReducerClass(WCReducer.class);
    		
    		// 指定reducetask的输出类型
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(LongWritable.class);
    		
    		// 本地路径
    		Path inputPath = new Path("d:/wordcount/input");
    		Path outputPath = new Path("d:/wordcount/output");
    		
    		FileSystem fs = FileSystem.get(conf);
    		if(fs.exists(outputPath)){
    			fs.delete(outputPath, true);
    		}
    		FileInputFormat.setInputPaths(job, inputPath);
    		FileOutputFormat.setOutputPath(job, outputPath);
    		
    		// 最后提交任务
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.exit(waitForCompletion?0:1);
    	}
    	
    	private static class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
    		@Override
    		protected void map(LongWritable key, Text value, Context context)
    				throws IOException, InterruptedException {
    			
    //			COUNT_LINES++;
    			context.getCounter(MyWordCounter.COUNT_LINES).increment(1L);
    			
    			// 在此写maptask的业务代码
    			String[] words = value.toString().split(" ");
    			for(String word: words){
    				context.write(new Text(word), new LongWritable(1));
    				context.getCounter(MyWordCounter.COUNT_WORD).increment(1L);
    			}
    		}
    	}
    	
    	private static class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
    		@Override
    		protected void reduce(Text key, Iterable<LongWritable> values, Context context)
    				throws IOException, InterruptedException {
    			// 在此写reducetask的业务代码
    			long sum = 0;
    			for(LongWritable v: values){
    				sum += v.get();
    			}
    			context.write(key, new LongWritable(sum));
    		}
    	}
    }
    

      或者:另一种情况

    package com.ghgj.mr.counter;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    public class CounterWordCount {
    enum CouterWordCountC{COUNT_WORDS, COUNT_LINES}
    public static void main(String[] args) throws Exception {
    // 指定 hdfs 相关的参数
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    // 设置 jar 包所在路径
    job.setJarByClass(CounterWordCount.class);
    job.setMapperClass(WCCounterMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);
    // 本地路径
    Path inputPath = new Path("d:/wordcount/input");
    FileInputFormat.setInputPaths(job, inputPath);
    job.setNumReduceTasks(0);
    Path outputPath = new Path("d:/wordcount/output");
    FileSystem fs = FileSystem.get(conf);
    if(fs.exists(outputPath)){
    fs.delete(outputPath, true);
    }
    FileOutputFormat.setOutputPath(job, outputPath);
    // 最后提交任务
    boolean waitForCompletion = job.waitForCompletion(true);
    System.exit(waitForCompletion?0:1);
    }
    private static class WCCounterMapper extends Mapper<LongWritable, Text, Text,
    LongWritable>{
    @Override
    protected void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
    // 统计行数,因为默认读取文本是逐行读取,所以 map 执行一次,行数+1
    context.getCounter(CouterWordCountC.COUNT_LINES).increment(1L);
    String[] words = value.toString().split(" ");
    for(String word: words){
    // 统计单词总数,遇见一个单词就+1
    context.getCounter(CouterWordCountC.COUNT_WORDS).increment(1L);
    }
    }
    }
    }
    

      

  • 相关阅读:
    Dart语言简介
    Flutter简介
    Flutter之环境配置详解for mac版本
    mac 安卓生成证书(uniapp项目安卓证书申请)
    IOS开发者账号申请流程以及开发证书与配置文件的使用
    解读typescript中 super关键字的用法
    解决Vue编译和打包时频繁内存溢出情况CALL_AND_RETRY_LAST Allocation failed
    JS pc端和移动端共同实现复制到剪贴板功能实现
    Web前端接入人机识别验证码---腾讯防水墙
    Unity3D Demo项目开发记录
  • 原文地址:https://www.cnblogs.com/liuwei6/p/6724070.html
Copyright © 2011-2022 走看看