zoukankan      html  css  js  c++  java
  • mr统计每年中每月温度的前三名

    weatherMapper

    package com.laoxiao.mr.weather;
    
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Date;
    
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class WeatherMapper extends Mapper<Text, Text, MyKey, DoubleWritable>{
    
        SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        protected void map(Text key, Text value, Context context) 
                throws java.io.IOException ,InterruptedException {
            try {
                Date d = df.parse(key.toString());
                Calendar c=Calendar.getInstance();
                c.setTime(d);
                int year=c.get(Calendar.YEAR);
                int month=c.get(Calendar.MONTH);
                double hot =Double.parseDouble(value.toString().substring(0, value.toString().lastIndexOf("c")));
                context.write(new MyKey(year,month+1,hot), new DoubleWritable(hot));
            } catch (Exception e) {
                e.printStackTrace();
            }
            
        };
    
    }
    View Code

    weatherReducer

     1 package com.laoxiao.mr.weather;
     2 
     3 import org.apache.hadoop.io.DoubleWritable;
     4 import org.apache.hadoop.io.NullWritable;
     5 import org.apache.hadoop.io.Text;
     6 import org.apache.hadoop.mapreduce.Reducer;
     7 
     8 public class WeatherReducer extends Reducer<MyKey, DoubleWritable, Text, NullWritable>{
     9     protected void reduce(MyKey arg0, java.lang.Iterable<DoubleWritable> arg1, Context arg2) 
    10             throws java.io.IOException ,InterruptedException {
    11         int i=0;
    12         for(DoubleWritable d:arg1){
    13             i++;
    14             arg2.write(new Text(arg0.getYear()+"	"+arg0.getMonth()+"	"+d.get()),NullWritable.get());
    15             if(i==3){
    16                 break;
    17             }
    18         }
    19     };
    20 
    21 }
    View Code

    MyKey

    package com.laoxiao.mr.weather;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class MyKey implements WritableComparable<MyKey>{
    
        private int year;
        private int month;
        private double hot;
        public MyKey(int year, int month, double hot) {
            super();
            this.year = year;
            this.month = month;
            this.hot = hot;
        }
        public MyKey() {
            // TODO Auto-generated constructor stub
        }
        public int getYear() {
            return year;
        }
        public void setYear(int year) {
            this.year = year;
        }
        public int getMonth() {
            return month;
        }
        public void setMonth(int month) {
            this.month = month;
        }
        public double getHot() {
            return hot;
        }
        public void setHot(double hot) {
            this.hot = hot;
        }
        
        public void readFields(DataInput arg0) throws IOException {
            this.year=arg0.readInt();
            this.month=arg0.readInt();
            this.hot=arg0.readDouble();
        }
        public void write(DataOutput arg0) throws IOException {
            arg0.writeInt(year);
            arg0.writeInt(month);
            arg0.writeDouble(hot);
        }
        
        //判断对象是否是同一个对象,当该对象作为输出的key
        public int compareTo(MyKey o) {
            int r1 =Integer.compare(this.year, o.getYear());
            if(r1==0){
                int r2 =Integer.compare(this.month, o.getMonth());
                if(r2==0){
                    return  Double.compare(this.hot, o.getHot());
                }else{
                    return r2;
                }
            }else{
                return r1;
            }
        }
        
    }
    View Code

    MyPartitioner

    package com.laoxiao.mr.weather;
    
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    
    public class MyPartitioner extends HashPartitioner<MyKey, DoubleWritable>{
        
        //执行时间越短越好
        public int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) {
            return (key.getYear()-1949)%numReduceTasks;
        }
    
    }
    View Code

    MySort

    package com.laoxiao.mr.weather;
    
    
    import org.apache.hadoop.io.WritableComparator;
    import org.apache.hadoop.io.WritableComparable;
    
    public class MySort extends WritableComparator{
    
        public MySort() {
            super(MyKey.class,true);
        }
        
        public int compare(WritableComparable a, WritableComparable b) {
            MyKey k1=(MyKey)a;
            MyKey k2=(MyKey)b;
            int r1=Integer.compare(k1.getYear(), k2.getYear());
            if(r1==0){
                int r2=Integer.compare(k1.getMonth(), k2.getMonth());
                if(r2==0){
                    return -Double.compare(k1.getHot(),k2.getHot());
                }else{
                    return r2;
                }
            }else{
                return r1;
            }
            
        }
    }
    View Code

    MyGroup

    package com.laoxiao.mr.weather;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    
    public class MyGroup extends WritableComparator{
    
        public MyGroup(){
            super(MyKey.class,true);
        }
        
        public int compare(WritableComparable a, WritableComparable b) {
            MyKey k1 =(MyKey) a;
            MyKey k2 =(MyKey) b;
            int r1 =Integer.compare(k1.getYear(), k2.getYear());
            if(r1==0){
                return Integer.compare(k1.getMonth(), k2.getMonth());
            }else{
                return r1;
            }
            
        }
    }
    View Code

    设置了三个reducer进程,最后的结果就放到了三个文件中。

  • 相关阅读:
    Flush the AOS cache from code
    EntityConnectionStringBuilder 构造EF连接字符串
    AX中文转拼音
    AX2012 AOT中Web部署显示二级以上菜单
    clearCompanyCache
    AX2009 打印到PDF优化
    AX ODBC读取其他SQL数据库服务器数据
    AX2009报表打印固定长度Barcode条码
    Create Product Variant
    Rename AOT Object
  • 原文地址:https://www.cnblogs.com/scl1314/p/7498336.html
Copyright © 2011-2022 走看看