zoukankan      html  css  js  c++  java
  • Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)

       不多说,直接上干货!

       下面,是版本1。

    Hadoop MapReduce编程 API入门系列之挖掘气象数据版本1(一)

        下面是版本2。

    Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2(九)

                  这篇博客,给大家,体会不一样的版本编程。

     

    代码

    package zhouls.bigdata.myMapReduce.weather;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class MyKey implements WritableComparable<MyKey>{
        //WritableComparable,实现这个方法,要多很多
        //readFields是读入,write是写出
        private int year;
        private int month;
        private double hot;
        public int getYear() {
        return year;
    }
    
        public void setYear(int year) {
            this.year = year;
        }
        
        public int getMonth() {
            return month;
        }
        
        public void setMonth(int month) {
            this.month = month;
        }
        
        public double getHot() {
            return hot;
        }
        
        public void setHot(double hot) {
            this.hot = hot;
            }//这一大段的get和set,可以右键,source,产生get和set,自动生成。
    
    
        public void readFields(DataInput arg0) throws IOException { //反序列化
            this.year=arg0.readInt();
            this.month=arg0.readInt();
            this.hot=arg0.readDouble();
        }
        
        public void write(DataOutput arg0) throws IOException { //序列化
            arg0.writeInt(year);
            arg0.writeInt(month);
            arg0.writeDouble(hot);
        }
    
        //判断对象是否是同一个对象,当该对象作为输出的key
        public int compareTo(MyKey o) {
            int r1 =Integer.compare(this.year, o.getYear());//比较当前的年和你传过来的年
            if(r1==0){
            int r2 =Integer.compare(this.month, o.getMonth());
            if(r2==0){
                return Double.compare(this.hot, o.getHot());
            }else{
                return r2;
            }
            }else{
                return r1;
            }
        }
    
    }
    package zhouls.bigdata.myMapReduce.weather;
    
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    
    public class MyPartitioner extends HashPartitioner<MyKey, DoubleWritable>{//这里就是洗牌
    
        //执行时间越短越好
        public int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) {
            return (key.getYear()-1949)%numReduceTasks;//对于一个数据集,找到最小,1949
        }
    }
    
    
    //1949-10-01 14:21:02    34c
    //1949-10-02 14:01:02    36c
    //1950-01-01 11:21:02    32c
    //1950-10-01 12:21:02    37c
    //1951-12-01 12:21:02    23c
    //1950-10-02 12:21:02    41c
    //1950-10-03 12:21:02    27c
    //1951-07-01 12:21:02    45c
    //1951-07-02 12:21:02    46c
    //1951-07-03 12:21:03    47c
    
     
    package zhouls.bigdata.myMapReduce.weather;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class MySort extends WritableComparator{
    
        public MySort(){
            super(MyKey.class,true);//把MyKey传进了
        }
    
        public int compare(WritableComparable a, WritableComparable b) {//这是排序的精髓
            MyKey k1 =(MyKey) a;
            MyKey k2 =(MyKey) b;
            int r1 =Integer.compare(k1.getYear(), k2.getYear());
            if(r1==0){//年相同
            int r2 =Integer.compare(k1.getMonth(), k2.getMonth());
            if(r2==0){//月相同
                return -Double.compare(k1.getHot(), k2.getHot());//比较气温
            }else{
                return r2;
            }
            }else{
                return r1;
            }
    
        }
    }
    package zhouls.bigdata.myMapReduce.weather;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class MyGroup extends WritableComparator{
    
        public MyGroup(){
            super(MyKey.class,true);//把MyKey传进了
    }
    
        public int compare(WritableComparable a, WritableComparable b) {//这是分组的精髓
            MyKey k1 =(MyKey) a;
            MyKey k2 =(MyKey) b;
            int r1 =Integer.compare(k1.getYear(), k2.getYear());
        if(r1==0){
            return Integer.compare(k1.getMonth(), k2.getMonth());
        }else{
            return r1;
        }
    
        }
    }
    
     
    package zhouls.bigdata.myMapReduce.weather;
    
    
    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Date;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class RunJob {
    
    
    //    1949-10-01 14:21:02    34c WeatherMapper
    //    1949-10-02 14:01:02    36c
    //    1950-01-01 11:21:02    32c    分区在MyPartitioner.java 
    //    1950-10-01 12:21:02    37c
    //    1951-12-01 12:21:02    23c    排序在MySort.java
    //    1950-10-02 12:21:02    41c
    //    1950-10-03 12:21:02    27c    分组在MyGroup.java
    //    1951-07-01 12:21:02    45c
    //    1951-07-02 12:21:02    46c    再,WeatherReducer
    //    1951-07-03 12:21:03    47c
    
    //key:每行第一个隔开符(制表符)左边为key,右边为value    自定义类型MyKey,洗牌,    
        static class WeatherMapper extends Mapper<Text, Text, MyKey, DoubleWritable>{
        SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        NullWritable v =NullWritable.get();
    //    1949-10-01 14:21:02是自定义类型MyKey,即key
    //    34c是DoubleWritable,即value
    
        protected void map(Text key, Text value,Context context) throws IOException, InterruptedException {
        try {
            Date date =sdf.parse(key.toString());
            Calendar c =Calendar.getInstance();
            //Calendar 类是一个抽象类,可以通过调用 getInstance() 静态方法获取一个 Calendar 对象,
            //此对象已由当前日期时间初始化,即默认代表当前时间,如 Calendar c = Calendar.getInstance();    
            c.setTime(date);
            int year =c.get(Calendar.YEAR);
            int month =c.get(Calendar.MONTH);
    
            double hot =Double.parseDouble(value.toString().substring(0, value.toString().lastIndexOf("c")));
            MyKey k =new MyKey();
            k.setYear(year);
            k.setMonth(month);
            k.setHot(hot);
            context.write(k, new DoubleWritable(hot));
        } catch (Exception e) {
            e.printStackTrace();
        }
        }
    }
    
        static class WeatherReducer extends Reducer<MyKey, DoubleWritable, Text, NullWritable>{
        protected void reduce(MyKey arg0, Iterable<DoubleWritable> arg1,Context arg2)throws IOException, InterruptedException {
            int i=0;
            for(DoubleWritable v :arg1){
            i++;
            String msg =arg0.getYear()+"	"+arg0.getMonth()+"	"+v.get();//"	"是制表符
            arg2.write(new Text(msg), NullWritable.get());
                    if(i==3){
                        break;
                    }
            }
        }
    }
    
    public static void main(String[] args) {
        Configuration config =new Configuration();
    //    config.set("fs.defaultFS", "hdfs://HadoopMaster:9000");
    //    config.set("yarn.resourcemanager.hostname", "HadoopMaster");
    //    config.set("mapred.jar", "C:\Users\Administrator\Desktop\wc.jar");
    //    config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");//默认分隔符是制表符"	",这里自定义,如","
        try {
            FileSystem fs =FileSystem.get(config);
    
            Job job =Job.getInstance(config);
            job.setJarByClass(RunJob.class);
    
            job.setJobName("weather");
    
            job.setMapperClass(WeatherMapper.class);
            job.setReducerClass(WeatherReducer.class);
            job.setMapOutputKeyClass(MyKey.class);
            job.setMapOutputValueClass(DoubleWritable.class);
    
            job.setPartitionerClass(MyPartitioner.class);
            job.setSortComparatorClass(MySort.class);
            job.setGroupingComparatorClass(MyGroup.class);
    
            job.setNumReduceTasks(3);
    
            job.setInputFormatClass(KeyValueTextInputFormat.class);
    
    //    FileInputFormat.addInputPath(job, new Path("hdfs://HadoopMaster:9000/weather.txt"));//输入路径,下有weather.txt
    //    
    //    Path outpath =new Path("hdfs://HadoopMaster:9000/out/weather");
    
            FileInputFormat.addInputPath(job, new Path("./data/weather.txt"));//输入路径,下有weather.txt
    
        Path outpath =new Path("./out/weather");
    
        if(fs.exists(outpath)){
            fs.delete(outpath, true);
        }
        FileOutputFormat.setOutputPath(job, outpath);
    
            boolean f= job.waitForCompletion(true);
            if(f){
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        }
    
    }
    欢迎大家,加入我的微信公众号:大数据躺过的坑
     
     

    同时,大家可以关注我的个人博客

       http://www.cnblogs.com/zlslch/   和  http://www.cnblogs.com/lchzls/ 

           以及对应本平台的QQ群:161156071(大数据躺过的坑)

  • 相关阅读:
    02.02.03第3章 餐饮项目案例(Power BI商业智能分析)
    02.02.02 第2章 制作power bi图表(Power BI商业智能分析)
    MySQL 目录结构信息
    Commons-FileUpload 文件上传(模板)
    Commons-FileUpload 常用API
    Java DOM方式解析XML(模板)
    常用的节点类型
    MySQL权限及登陆、退出方法
    Java 锁
    线程的状态
  • 原文地址:https://www.cnblogs.com/zlslch/p/6164729.html
Copyright © 2011-2022 走看看