zoukankan      html  css  js  c++  java
  • Hadoop 学习笔记 (十) MapReduce实现排序 全局变量

    一些疑问:
    1 全排序的话,最后的应该sortJob.setNumReduceTasks(1);
    2 如果多个reduce task都去修改 一个静态的 IntWritable ,IntWritable会乱序吧~
    输入数据:
    file1
    2
    32
    654
    32
    15
    756
    65223
    file2
    5956
    22
    650
    92
    file3
    26
    54
    6




    import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    public class MySort { public static class IntSortMapper extends Mapper<Object, Text, IntWritable, NullWritable>{ private IntWritable val = new IntWritable(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString().trim(); val.set(Integer.parseInt(line)); context.write(val, NullWritable.get()); } } public static class IntSortReducer extends Reducer<IntWritable, NullWritable, IntWritable,IntWritable>{ private IntWritable k = new IntWritable(); public void reduce(IntWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException{ k.set(1); for (NullWritable value : values) { context.write(k, key); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { String dir_in = "hdfs://localhost:9000/in_sort"; String dir_out = "hdfs://localhost:9000/out_sort"; Path in = new Path(dir_in); Path out = new Path(dir_out); Configuration conf = new Configuration(); Job sortJob = new Job(conf, "my_sort"); sortJob.setJarByClass(MySort.class); sortJob.setInputFormatClass(TextInputFormat.class); sortJob.setMapperClass(IntSortMapper.class); //sortJob.setCombinerClass(SortReducer.class); //countJob.setPartitionerClass(HashPartitioner.class); sortJob.setMapOutputKeyClass(IntWritable.class); sortJob.setMapOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(sortJob, in); sortJob.setReducerClass(IntSortReducer.class); sortJob.setNumReduceTasks(1); sortJob.setOutputKeyClass(IntWritable.class); sortJob.setOutputValueClass(IntWritable.class); //countJob.setOutputFormatClass(SequenceFileOutputFormat.class); FileOutputFormat.setOutputPath(sortJob, out); sortJob.waitForCompletion(true); } }
    结果:
    1    2
    1    6
    1    15
    1    22
    1    26
    1    32
    1    32
    1    54
    1    92
    1    650
    1    654
    1    756
    1    5956
    1    65223
    
    
    修改reduce函数(不是用Iterable)
    public static class IntSortReducer extends Reducer<IntWritable, NullWritable, IntWritable,IntWritable>{
            private IntWritable k = new IntWritable();
            public void reduce(IntWritable key, NullWritable value, Context context) throws IOException, InterruptedException{
                k.set(1);
                //for (NullWritable value : values) {
                    context.write(k, key);
                //}
            }
        }
    结果:(不是很理解,为啥去掉iterable后就只输出一个value  key哪去了呢)
    2
    6
    15
    22
    26
    32
    32
    54
    92
    650
    654
    756
    5956
    65223
    
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    public class MySort {
    
        public static class IntSortMapper extends Mapper<Object, Text, IntWritable, NullWritable>{
            
            private IntWritable val = new IntWritable();
            
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
                String line = value.toString().trim();
                val.set(Integer.parseInt(line));
                context.write(val, NullWritable.get());
            }
        }
        
        public static class IntSortReducer extends Reducer<IntWritable, NullWritable, IntWritable,IntWritable>{
            private static IntWritable num = new IntWritable(1);
            public void reduce(IntWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException{
                
                for (NullWritable value : values) {
                    context.write(num, key);
                    num = new IntWritable(num.get() + 1);
                }
            }
        }
        
        
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            String dir_in = "hdfs://localhost:9000/in_sort";
            String dir_out = "hdfs://localhost:9000/out_sort";
    
            Path in = new Path(dir_in);
            Path out = new Path(dir_out);
            
            Configuration conf = new Configuration();
            Job sortJob = new Job(conf, "my_sort");
    
            sortJob.setJarByClass(MySort.class);
    
            sortJob.setInputFormatClass(TextInputFormat.class);
            sortJob.setMapperClass(IntSortMapper.class);
            //sortJob.setCombinerClass(SortReducer.class);
            //countJob.setPartitionerClass(HashPartitioner.class);
            sortJob.setMapOutputKeyClass(IntWritable.class);
            sortJob.setMapOutputValueClass(NullWritable.class);
    
            FileInputFormat.addInputPath(sortJob, in);
    
            sortJob.setReducerClass(IntSortReducer.class);
            sortJob.setNumReduceTasks(1);
            sortJob.setOutputKeyClass(IntWritable.class);
            sortJob.setOutputValueClass(IntWritable.class);
            //countJob.setOutputFormatClass(SequenceFileOutputFormat.class);
    
            FileOutputFormat.setOutputPath(sortJob, out);
    
            sortJob.waitForCompletion(true);
    
        }
    
    }
    1    2
    2    6
    3    15
    4    22
    5    26
    6    32
    7    32
    8    54
    9    92
    10    650
    11    654
    12    756
    13    5956
    14    65223

     


  • 相关阅读:
    114. Flatten Binary Tree to Linked List
    odoo docker环境下将日志存储在数据库中ir_logging
    odoo 日志切割存储,日志存储到数据库中
    odoo 通过nginx反向代理后获取真实IP地址
    html样式超出长度部分使用省略号显示
    vim 查找字串所在的位置
    系统重启 后 Docker服务及容器自动启动设置
    字串格式化换format使用
    markdown 测试代码高亮
    协程与线程的简单区分
  • 原文地址:https://www.cnblogs.com/i80386/p/3608148.html
Copyright © 2011-2022 走看看