zoukankan      html  css  js  c++  java
  • MapReduce求最大值最小值问题

    import java.io.File;
    import java.io.IOException;
     
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     
    public class GetMinMaxKeyMapReduce {
         
        public static class GetMinMaxKeyMap extends Mapper<Object, Text, Text,Text> {
            private Text min = new Text();
            private Text max = new Text();
            private Long i = new Long(0);
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                String[] strs = value.toString().split("	");
                if (strs!=null && strs.length>5 &&strs[3].length() > 20 && strs[3].indexOf(" ") == -1 && strs[3].indexOf("=") == -1) {
                    if(i==0){
                        min= new Text(strs[3]);
                        max= new Text(strs[3]);
                    }
                    if(strs[3].compareTo(min.toString())<0){
                        min=new Text(strs[3]);
                    }
                    if(strs[3].compareTo(max.toString())>0){
                        max=new Text(strs[3]);
                    }
                    i++;
                }
            }
             
            @Override
            protected void cleanup(Context context) throws IOException, InterruptedException {
                context.write(new Text("min"), min);
                context.write(new Text("max"), max);
            }
        }
     
        public static class GetMinMaxKeyReducer extends Reducer<Text, Text, Text, Text> {
            public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                String result ="";
                for (Text value : values) {
                    if(result.equals("")){
                        result = value.toString();
                    }
                    if (("min").equals(key.toString())) {
                        if(value.toString().compareTo(result)<0){
                            result=value.toString();
                        }
                    } else if (("max").equals(key.toString())) {
                        if(value.toString().compareTo(result)>0){
                            result=value.toString();
                        }
                    } else {
                        System.err.println("未知reduce 输入key:" + key.toString());
                    }
                }
                context.write(key, new Text(result));
            }
        }
     
        public static void main(String[] args) throws Exception {
            File jarFile = EJob.createTempJar("bin");
            ClassLoader classLoader = EJob.getClassLoader();
            Thread.currentThread().setContextClassLoader(classLoader);
             
            //Hadoop 运行环境
            Configuration conf = new Configuration();
            conf.set("mapred.job.tracker", "bfdbjc1:12001");;
             
            //任务参数设置
            Job job = new Job(conf, "GetMinMaxKey");
     
            job.setJarByClass(GetMinMaxKeyMapReduce.class);
            job.setMapperClass(GetMinMaxKeyMap.class);
            job.setReducerClass(GetMinMaxKeyReducer.class);
     
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
     
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
             
            FileInputFormat.addInputPath(job, new Path("hdfs://bfdbjc1:12000/user/work/tables2/raw_kafka/l_date=2013-09-15"));
            FileOutputFormat.setOutputPath(job, new Path("hdfs://bfdbjc1:12000/user/work/output/minmaxkey/"));
             
            //Eclipse 本地提交
            ((JobConf) job.getConfiguration()).setJar(jarFile.toString());
             
            //等待任务运行完成
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
  • 相关阅读:
    好题Islands
    DB2的安装
    MariaDB存在的问题
    MariaDB 脚本
    SQL 执行顺序
    Maria数据库
    3 ignite windows 上安装
    Cassandra 学习七 cassandra研究
    Cassandra学习六 一些知识点
    Cassandra学习五 使用Key的正确姿势
  • 原文地址:https://www.cnblogs.com/likai198981/p/3790403.html
Copyright © 2011-2022 走看看