zoukankan      html  css  js  c++  java
  • MapReduce实现二次排序(温度年份排序)

    文件内容:

    1949-10-01 14:21:02    341949-10-02 14:01:02    361950-01-01 14:21:02    321950-10-01 11:01:02    371951-10-01 14:21:02    231950-10-02 17:11:02    411950-10-01 18:20:02    271951-07-01 14:01:02    451951-07-02 13:21:02    46℃

    需求:

      按照年份升序排序,同时每一年中温度降序排序

    代码实现:

    一、KeyPair 自定义类

    package com.whu.test;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class KeyPair implements WritableComparable<KeyPair> {
        
        private int year;
        private int hot;
        
    
        public int getYear() {
            return year;
        }
    
        public void setYear(int year) {
            this.year = year;
        }
    
        public int getHot() {
            return hot;
        }
    
        public void setHot(int hot) {
            this.hot = hot;
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.year = in.readInt();
            this.hot = in.readInt();
            
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
        
            out.writeInt(year);
            out.writeInt(hot);
        }
    
        @Override
        public int compareTo(KeyPair o) {
    
            int y = Integer.compare(year, o.getYear());
            if(y == 0){
                return Integer.compare(hot, o.getHot());
            }
            return y;
        }
    
        @Override
        public String toString() {
            return year+"年"+hot+"℃";
        }
    }

    二、FirstPartitioner 自定义分区类

    package com.whu.test;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class FirstPartitioner extends Partitioner<KeyPair, Text> {
    
        @Override
        public int getPartition(KeyPair key, Text value, int nums) {
    
            return (key.getYear()*127 & Integer.MAX_VALUE) % nums;
        }
    
    }

    三、SortKey 自定义排序类

    package com.whu.test;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class SortKey extends WritableComparator {
    
        public SortKey() {
            super(KeyPair.class,true);
        }
        
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            KeyPair k1 = (KeyPair)a;
            KeyPair k2 = (KeyPair)b;
            int pre = Integer.compare(k1.getYear(), k2.getYear());
            if(pre != 0){
                return pre;
            }
            return -Integer.compare(k1.getHot(), k2.getHot());
        }
    }

    四、GroupComparator 自定义分组类

    package com.whu.test;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    
    
    public class GroupComparator extends WritableComparator  {
        
        protected GroupComparator() {
            
            super(KeyPair.class,true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            KeyPair k1 = (KeyPair)a;
            KeyPair k2 = (KeyPair)b;
            return Integer.compare(k1.getYear(), k2.getYear());
        }
    
    }

    五、MyMapper 自定义Mapper类

    package com.whu.test;
    
    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Date;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class MyMapper extends Mapper<LongWritable, Text, KeyPair, Text> {
        
        private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        private KeyPair k = new KeyPair();
        
        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws IOException, InterruptedException {
            String line = new String(value.getBytes(), 0, value.getLength(), "GBK");
            String[] tmp = line.split("	");
            System.out.println(tmp[0]+"	"+tmp[1]);
            if(tmp.length>=2){
                try {
                    Date date = sdf.parse(tmp[0]);
                    Calendar cal = Calendar.getInstance();
                    cal.setTime(date);
                    int year = cal.get(1);
                    k.setYear(year);
                } catch (ParseException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                int hot = Integer.parseInt(tmp[1].substring(0, tmp[1].indexOf("℃")));
                k.setHot(hot);
                context.write(k, value);
            }
        }
    
    }

    六、MyReducer 自定义Reducer类

    package com.whu.test;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MyReducer extends Reducer<KeyPair, Text, KeyPair,Text> {
    
        @Override
        protected void reduce(KeyPair key, Iterable<Text> value,Context context)
                throws IOException, InterruptedException {
            
            for(Text t : value){
                context.write(key, t);
            }
        }
    }

    七、YearHot 自定义驱动类

    package com.whu.test;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    public class YearHot {
    
        public static void main(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "year hot sort");
            
            job.setJarByClass(YearHot.class);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            
            job.setNumReduceTasks(3);
            job.setPartitionerClass(FirstPartitioner.class);
            job.setSortComparatorClass(SortKey.class);
            job.setGroupingComparatorClass(GroupComparator.class);
            
            job.setOutputKeyClass(KeyPair.class);
            job.setOutputValueClass(Text.class);
            job.setOutputFormatClass(GBKOutputFormat.class);
            
            FileInputFormat.addInputPath(job, new Path("hdfs://192.168.228.134:/usr/qqx/yhinput/file.txt"));
            FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.228.134:/usr/qqx/yhoutput"));
            System.exit(job.waitForCompletion(true)?0:1);
        }
    }
  • 相关阅读:
    1040 Longest Symmetric String (25)
    1068 Find More Coins (30)
    1045 Favorite Color Stripe (30)
    1008 数组元素循环右移问题 (20)
    1007. 素数对猜想
    1005. 继续(3n+1)猜想 (25)
    1001. 害死人不偿命的(3n+1)猜想 (15)
    递归经典面试题_ 小例
    简单实现_控制台小时钟
    使用Timer组件_实现定时更改窗体颜色
  • 原文地址:https://www.cnblogs.com/qiaoqianxiong/p/4991826.html
Copyright © 2011-2022 走看看