zoukankan      html  css  js  c++  java
  • 二次排序问题(分别使用Hadoop和Spark实现)

      不多说,直接上干货!

      这篇博客里的算法部分的内容来自《数据算法:Hadoop/Spark大数据处理技巧》一书,不过书中的代码虽然思路正确,但是代码不完整,并且只有java部分的编程,我在它的基础上又加入scala部分,当然是在使用Spark的时候写的scala。

    一、输入、期望输出、思路。

    输入为SecondarySort.txt,内容为:

    2000,12,04,10
    2000,11,01,20
    2000,12,02,-20
    2000,11,07,30
    2000,11,24,-40
    2012,12,21,30
    2012,12,22,-20
    2012,12,23,60
    2012,12,24,70
    2012,12,25,10
    2013,01,23,90
    2013,01,24,70
    2013,01,20,-10

    意义为:年,月,日,温度

    期望输出:

    2013-01 90,70,-10
    2012-12 70,60,30,10,-20
    2000-12 10,-20
    2000-11 30,20,-40

    意义为:

    年-月 温度1,温度2,温度3,……

    年-月从上之下降序排列,

    温度从左到右降序排列

    思路:

    抛弃不需要的代表日的哪一行数据

    将年月作为组合键(key),比较大小,降序排列

    将对应年月(key)的温度的值(value)进行降序排列和拼接

    二、使用Java编写MapReduce程序实现二次排序

    代码要实现的类有:

    除了常见的SecondarySortingMapper,SecondarySortingReducer,和SecondarySortDriver以外

    这里还多出了两个个插件类(DateTemperatureGroupingComparator和DateTemperaturePartioner)和一个自定义类型(DateTemperaturePair)

    以下是实现的代码(注意以下每个文件的代码段我去掉了包名,所以要使用的话自己加上吧):

    SecondarySortDriver.java

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class SecondarySortDriver extends Configured implements Tool {
        public int run(String[] args) throws Exception {
            Configuration configuration = getConf();
            Job job = Job.getInstance(configuration, "SecondarySort");
            job.setJarByClass(SecondarySortDriver.class);
            job.setJobName("SecondarySort");
    
            Path inputPath = new Path(args[0]);
            Path outputPath = new Path(args[1]);
            FileInputFormat.setInputPaths(job, inputPath);
            FileOutputFormat.setOutputPath(job, outputPath);
    
            // 设置map输出key value格式
            job.setMapOutputKeyClass(DateTemperaturePair.class);
            job.setMapOutputValueClass(IntWritable.class);
            // 设置reduce输出key value格式
            job.setOutputKeyClass(DateTemperaturePair.class);
            job.setOutputValueClass(IntWritable.class);
    
            job.setMapperClass(SecondarySortingMapper.class);
            job.setReducerClass(SecondarySortingReducer.class);
            job.setPartitionerClass(DateTemperaturePartitioner.class);
            job.setGroupingComparatorClass(DateTemperatureGroupingComparator.class);
    
            boolean status = job.waitForCompletion(true);
            return status ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            if (args.length != 2) {
                throw new IllegalArgumentException(
                        "!!!!!!!!!!!!!! Usage!!!!!!!!!!!!!!: SecondarySortDriver"
                                + "<input-path> <output-path>");
            }
            int returnStatus = ToolRunner.run(new SecondarySortDriver(), args);
            System.exit(returnStatus);
        }
    }

    DateTemperaturePair.java

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class DateTemperaturePair implements Writable,
            WritableComparable<DateTemperaturePair> {
        private String yearMonth;
        private String day;
        protected Integer temperature;
    
        public int compareTo(DateTemperaturePair o) {
            int compareValue = this.yearMonth.compareTo(o.getYearMonth());
            if (compareValue == 0) {
                compareValue = temperature.compareTo(o.getTemperature());
            }
            return -1 * compareValue;
        }
    
        public void write(DataOutput dataOutput) throws IOException {
            Text.writeString(dataOutput, yearMonth);
            dataOutput.writeInt(temperature);
    
        }
    
        public void readFields(DataInput dataInput) throws IOException {
            this.yearMonth = Text.readString(dataInput);
            this.temperature = dataInput.readInt();
    
        }
    
        @Override
        public String toString() {
            return yearMonth.toString();
        }
    
        public String getYearMonth() {
            return yearMonth;
        }
    
        public void setYearMonth(String text) {
            this.yearMonth = text;
        }
    
        public String getDay() {
            return day;
        }
    
        public void setDay(String day) {
            this.day = day;
        }
    
        public Integer getTemperature() {
            return temperature;
        }
    
        public void setTemperature(Integer temperature) {
            this.temperature = temperature;
        }
    }

    SecondarySortingMapper.java

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class SecondarySortingMapper extends
            Mapper<LongWritable, Text, DateTemperaturePair, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] tokens = value.toString().split(",");
            // YYYY = tokens[0]
            // MM = tokens[1]
            // DD = tokens[2]
            // temperature = tokens[3]
            String yearMonth = tokens[0] + "-" + tokens[1];
            String day = tokens[2];
            int temperature = Integer.parseInt(tokens[3]);
    
            DateTemperaturePair reduceKey = new DateTemperaturePair();
            reduceKey.setYearMonth(yearMonth);
            reduceKey.setDay(day);
            reduceKey.setTemperature(temperature);
            context.write(reduceKey, new IntWritable(temperature));
        }
    }

    DateTemperaturePartioner.java

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class DateTemperaturePartitioner extends
            Partitioner<DateTemperaturePair, Text> {
        @Override
        public int getPartition(DateTemperaturePair dataTemperaturePair, Text text,
                int i) {
            return Math.abs(dataTemperaturePair.getYearMonth().hashCode() % i);
        }
    }

    DateTemperatureGroupingComparator.java

    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class DateTemperatureGroupingComparator extends WritableComparator {
    
        public DateTemperatureGroupingComparator() {
            super(DateTemperaturePair.class, true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            DateTemperaturePair pair1 = (DateTemperaturePair) a;
            DateTemperaturePair pair2 = (DateTemperaturePair) b;
            return pair1.getYearMonth().compareTo(pair2.getYearMonth());
        }
    }

    SecondarySortingReducer.java

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class SecondarySortingReducer extends
            Reducer<DateTemperaturePair, IntWritable, DateTemperaturePair, Text> {
    
        @Override
        protected void reduce(DateTemperaturePair key,
                Iterable<IntWritable> values, Context context) throws IOException,
                InterruptedException {
            StringBuilder sortedTemperatureList = new StringBuilder();
            for (IntWritable temperature : values) {
                sortedTemperatureList.append(temperature);
                sortedTemperatureList.append(",");
            }
            sortedTemperatureList.deleteCharAt(sortedTemperatureList.length()-1);
            context.write(key, new Text(sortedTemperatureList.toString()));
        }
    
    }

    三、使用scala编写Spark程序实现二次排序

    这个代码想必就比较简洁了。如下:

    SecondarySort.scala

    package spark
    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
    import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
    
    object SecondarySort {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName(" Secondary Sort ")
        .setMaster("local")
        var sc = new SparkContext(conf)
        sc.setLogLevel("Warn")
        //val file = sc.textFile("hdfs://localhost:9000/Spark/SecondarySort/Input/SecondarySort2.txt")
         val file = sc.textFile("e:\SecondarySort.txt")
        val rdd = file.map(line => line.split(","))
        .map(x=>((x(0),x(1)),x(3))).groupByKey().sortByKey(false)
        .map(x => (x._1._1+"-"+x._1._2,x._2.toList.sortWith(_>_)))
        rdd.foreach(
            x=>{
                val buf = new StringBuilder()
                for(a <- x._2){
                  buf.append(a)
                  buf.append(",")
                  }
                buf.deleteCharAt(buf.length()-1)
                println(x._1+" "+buf.toString())
            })
        sc.stop()
      }
    }
  • 相关阅读:
    「学习笔记」Min25筛
    hash索引
    Thread的setDaemon(true)方法的作用
    Reactor模式详解
    题目整理
    jstat命令查看jvm的GC情况 (以Linux为例)
    jvm参数调优
    以网游服务端的网络接入层设计为例,理解实时通信的技术挑战
    Java 工程师成神之路
    ID生成 雪花算法
  • 原文地址:https://www.cnblogs.com/zlslch/p/6949779.html
Copyright © 2011-2022 走看看