zoukankan      html  css  js  c++  java
  • 使用MapReduce实现温度排序

    温度排序代码,具体说明可以搜索其他博客

    KeyPair.java

    package temperaturesort;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class KeyPair implements WritableComparable<KeyPair> {
        private int hot;
        private int year;
    
        public int getYear() {
            return year;
        }
    
        public void setYear(int year) {
            this.year = year;
        }
    
        public int getHot() {
            return hot;
        }
    
        public void setHot(int hot) {
            this.hot = hot;
        }
    
        public int compareTo(KeyPair o) {
            int result = this.year-o.getYear();
            if(result!=0){
                return result<0?-1:1;
            }
            return -( this.hot < o.getHot() ? -1 :(this.hot == o.getHot()?0:1));
        }
    
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeInt(year);
            dataOutput.writeInt(hot);
        }
    
        public void readFields(DataInput dataInput) throws IOException {
            this.year=dataInput.readInt();
            this.hot=dataInput.readInt();
        }
    
        @Override
        public String toString() {
            return year+"	"+hot;
        }
    
        @Override
        public int hashCode() {
            return new Integer(year+hot).hashCode();
        }
    }

    Sort.java:

    package temperaturesort;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class Sort extends WritableComparator {
        public Sort(){
             super(KeyPair.class,true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            KeyPair key1 = (KeyPair)a;
            KeyPair key2 = (KeyPair)b;
            int result = key1.getYear()-key2.getYear();
            if(result!=0){
                return result<0?-1:1;
            }
            return key1.getHot()< key2.getHot() ? 1 :(key1.getHot() == key2.getHot()?0:-1);
        }
    }

    Partition.java:

    package temperaturesort;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class Partition extends Partitioner<KeyPair,Text>{
        @Override
        public int getPartition(KeyPair keyPair, Text text, int num) {
            return keyPair.getYear()*127 % num;
        }
    }

    Group.java:

    package temperaturesort;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class Group extends WritableComparator {
        public Group(){
            super(KeyPair.class,true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            KeyPair key1 = (KeyPair)a;
            KeyPair key2 = (KeyPair)b;
            return key1.getYear() < key2.getYear() ? -1 : (key1.getYear()==key2.getYear()?0:1);
        }
    }

    RunJob.java:

    package temperaturesort;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Date;
    
    
    public class RunJob {
        public static class TempSortMapper extends Mapper<Object,Text,KeyPair,Text>{
            static SimpleDateFormat simpleDateFormat =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            @Override
            protected void map(Object key, Text value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] strArr=line.split("	");
                if(strArr.length==2){
                    try {
                        Date date = simpleDateFormat.parse(strArr[0]);
                        Calendar calendar = Calendar.getInstance();
                        calendar.setTime(date);
                        int year = calendar.get(1);
                        int hot = Integer.parseInt(strArr[1].substring(0,strArr[1].indexOf("C")));
                        KeyPair keyPair =new KeyPair();
                        keyPair.setHot(hot);
                        keyPair.setYear(year);
                        /*System.out.println("-------------------------------------------------------------------");
                        System.out.println(keyPair);*/
                        context.write(keyPair,value);
                    } catch (ParseException e) {
                        e.printStackTrace();
                    }
                }
    
            }
        }
    
        public static class TempSortReducer extends Reducer<KeyPair,Text,KeyPair,Text>{
            @Override
            protected void reduce(KeyPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                for(Text text:values)
                    context.write(key,text);
            }
        }
    
        public static void main(String[] args) throws Exception {
            //System.setProperty("hadoop.home.dir","E:\softs\majorSoft\hadoop-2.7.5");
            Configuration conf = new Configuration();
            conf.set("mapreduce.app-submission.cross-platform", "true");
            Path fileInput = new Path("hdfs://mycluster/testFile/hot.txt");
            Path fileOutput = new Path("hdfs://mycluster/output/hot");
    
            Job job =Job.getInstance(conf ,"temperatureSort");
            job.setJar("E:\bigData\hadoopDemo\out\artifacts\wordCount_jar\hadoopDemo.jar");
            job.setJarByClass(RunJob.class);
            job.setMapperClass(TempSortMapper.class);
            job.setReducerClass(TempSortReducer.class);
            job.setMapOutputKeyClass(KeyPair.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setNumReduceTasks(3);
            job.setSortComparatorClass(Sort.class);
            job.setPartitionerClass(Partition.class);
            job.setGroupingComparatorClass(Group.class);
    
            FileInputFormat.addInputPath(job,fileInput);
            FileOutputFormat.setOutputPath(job,fileOutput);
            System.exit(job.waitForCompletion(true)?0:1);
        }
    }

    其中自定义的sort和parititon是在mapTask任务之后使用的,而Group是在reduce任务使用的。

  • 相关阅读:
    Shell 查找和关闭进程
    如何重启MySQL,正确启动MySQL
    php 杂记
    Linux gcc编译简介、常用命令
    memset和printf(buf)编译出错
    Linux 文件操作函数
    Sizeof与Strlen的区别与联系
    获取Checkbox的值
    undefined reference to 'pthread_create'
    linux makefile文件
  • 原文地址:https://www.cnblogs.com/ksWorld/p/8670565.html
Copyright © 2011-2022 走看看