zoukankan      html  css  js  c++  java
  • 十二道MR习题

    题目

    有两个文件A和B,两个文件中都有几百万行数字,现在需要找出A文件和B文件中数字集合的交集、并集、以及A对B的差集。

    简单说一下思路:

    这个问题关键在于key和value的设计。这里我将文件中的数字设置为key,将文件名称设置为value。这样在reduce阶段很容易就能找出A、B两个文件中数字的交并差集了。

    并集就是reduce阶段能输出的全部记录;交集则需要做下过滤,即一个记录中的value需要同时有A、B两个文件的名称;差集则是文件名称集合中只包含A或B的记录。

    看下用MapReduce是如何实现的:

    package com.zhyea.dev;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    public class ContentCompare {
    
    
        public static class SplitterMapper extends Mapper<Object, Text, Text, Text> {
    
            private Text text = new Text();
    
            @Override
            public void map(Object key, Text value, Context context) {
                try {
                    String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
                    text.set(fileName);
                    context.write(value, text);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
    
        public static class UnionReducer extends Reducer<Text, Text, Text, NullWritable> {
            @Override
            public void reduce(Text key, Iterable<Text> values, Context context) {
                try {
                    context.write(key, NullWritable.get());
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    
        public static class InterReducer extends Reducer<Text, Text, Text, NullWritable> {
            @Override
            public void reduce(Text key, Iterable<Text> values, Context context) {
                try {
                    Iterator<Text> itr = values.iterator();
                    boolean flagA = false;
                    boolean flagB = false;
                    while (itr.hasNext()) {
                        String s = itr.next().toString();
                        if (s.equals("B")) {
                            flagB = true;
                        }
                        if (s.equals("A")) {
                            flagA = true;
                        }
                    }
                    if (flagA && flagB) {
                        context.write(key, NullWritable.get());
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    
        public static class DiffAReducer extends Reducer<Text, Text, Text, NullWritable> {
            @Override
            public void reduce(Text key, Iterable<Text> values, Context context) {
                try {
                    Iterator<Text> itr = values.iterator();
                    boolean flagA = false;
                    boolean flagB = false;
                    while (itr.hasNext()) {
                        String s = itr.next().toString();
                        if (s.equals("A")) {
                            flagA = true;
                        }
                        if (s.equals("B")) {
                            flagB = true;
                        }
                    }
                    if (flagA && !flagB) {
                        context.write(key, NullWritable.get());
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
            Configuration conf = new Configuration();
    
            Job job = Job.getInstance(conf, "content-compare");
            job.setJarByClass(ContentCompare.class);
    
            job.setMapperClass(SplitterMapper.class);
            job.setReducerClass(DiffAReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            job.setNumReduceTasks(1);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    
    }

    用spark实现就简单的多了,这里我们很大程度上是受益于scala语法的简洁性:

    package com.talkingdata.campaign
    
    import org.apache.hadoop.io.{LongWritable, Text}
    import org.apache.hadoop.mapred.{FileSplit, InputSplit, TextInputFormat}
    import org.apache.spark.rdd.HadoopRDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object ContentCompare {
    
      def main(args: Array[String]): Unit = {
    
        val inputPath = args(0)
        val outputPath = args(1)
        val conf = new SparkConf().setAppName("content compare")
        val sc = new SparkContext(conf)
    
    
        val data = sc.hadoopFile[LongWritable, Text, TextInputFormat](inputPath)
        val hadoopRDD = data.asInstanceOf[HadoopRDD[LongWritable, Text]]
    
        hadoopRDD.mapPartitionsWithInputSplit[(String, String)](readFile)
          .reduceByKey(_ + _)
          .filter(p => p._2.length == 2)
          .map(p => p._1)
          .repartition(1)
          .saveAsTextFile(outputPath)
    
        def readFile(inputSplit: InputSplit, itr: Iterator[(LongWritable, Text)]) = {
          val fileName = inputSplit.asInstanceOf[FileSplit].getPath.getName
          itr.map(p => (p._2.toString, fileName))
        }
    
      }
    
    }

    上面的代码中列出了计算交集的方法。并集实在是没什么好说的,读取文件后,reduce或distinct一下就能实现了。

    要计算差集的话只需要调整下filter中的函数值就可以了:

    hadoopRDD.mapPartitionsWithInputSplit[(String, String)](readFile)
          .reduceByKey(_ + _)
          .filter(p => p._2.length == 1 && p._2 == "A")
          .map(p => p._1)
          .repartition(1)
          .saveAsTextFile(outputPath)

    #############

  • 相关阅读:
    redis之不重启,切换RDB备份到AOF备份
    redis之持久化RDB与AOF
    redis之哨兵集群
    redis之订阅功能
    redis之基础命令
    Liunx之MySQL安装与主从复制
    Python邮件发送脚本(Linux,Windows)通用
    redhat6.4 gcc g++ rpm方式安装步骤
    LVS+Keepalived+Mysql+主主数据库架构[2台]
    监控mysql状态脚本
  • 原文地址:https://www.cnblogs.com/amunote/p/7571652.html
Copyright © 2011-2022 走看看