zoukankan      html  css  js  c++  java
  • MapReduce 实现分片取TopN 再Reduce取TopN

    ZX: 在Hadoop的map阶段 , 每个map任务都可以执行特殊的任务 , 当然 , 也包括取每个分片中最大的几个值. 然后汇总 , 再取TopN

      这样的好处是 , 如果有2万个分片 , 2亿条数据 ,在Map完之后 

      假如是取Top10 , 那么经过Map阶段的Top10,最后只用处理2万X10 , 共计20万条数据 , 大大减少了带宽的消耗.

      具体实现

       这里代码好看些: https://www.jianshu.com/p/aa75ffcea376

      

    package TopN;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    import java.io.IOException;
    import java.util.SortedMap;
    import java.util.TreeMap;
    
    /**
     * Created by Administrator on 2019/4/3.
     */
    public class TopnRunner {
    
        public static class TopnMapper extends
                Mapper<LongWritable, Text, NullWritable, Text> {
            //定义一个排序集合,用来收集每个分片中topn值
            //xxx,12
            private SortedMap<Long, String> topN = new TreeMap<Long, String>();
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                //遍历整个分片数据集,把新的数据插入到SortedMap
    
                Long number = Long.valueOf(value.toString());
    
                //aa,10   (10, "aa,10")
                //找到每个分片中 最大5个数据
                topN.put(number, value.toString());
    
                //取出最大5个值
                if(topN.size() > 5) {
                    //始终移除最小的那个
                    topN.remove(topN.firstKey());
                }
    
                //只有 遍历完整个分片数据 才能调用context.write方法
                //context.write();
            }
    
            @Override
            protected void cleanup(Context context) throws IOException, InterruptedException {
    
                Text mapValue = new Text();
    
                //切记 该算法  采集数据 不多
                for (String tmp:topN.values()) {
                    mapValue.set(tmp);
                    context.write(NullWritable.get(), mapValue);
                }
                System.out.println("-----------------------------");
            }
        }
    
        public static class TopnReducer extends
                Reducer<NullWritable, Text, Text, NullWritable> {
    
            private SortedMap<Long, String> topN = new TreeMap<Long, String>();
            private Text reduceKey = new Text();
    
            @Override
            protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                //super.reduce(key, values, context);
                for (Text tmp:values) {
                    System.out.println(tmp);
                    topN.put(Long.valueOf(tmp.toString()), tmp.toString());
    
    
                    if (topN.size() > 5) {
                        topN.remove(topN.firstKey());
                    }
                }
    
                //获取整个数据集 topn
                for (String tmp:topN.values()) {
                    reduceKey.set(tmp);
                    context.write(reduceKey, NullWritable.get());
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
            System.setProperty("hadoop.home.dir",
                    "E:\hadoop-2.6.0-cdh5.15.0"); /*看需不需要bin*/
    
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://node1:8020"); /*看版本,自己能读取到的,也可能是50070*/
    
            Job job = Job.getInstance(conf);
            job.setJobName("优化后的topn");
            job.setJarByClass(TopnRunner.class);
    
            Path inputPath = new Path("/topn.txt"); /*输入目录*/
            FileInputFormat.addInputPath(job, inputPath);
    
            job.setMapperClass(TopnMapper.class);
            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(Text.class);
    
    
            job.setReducerClass(TopnReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            Path outputPath = new Path("/AASD"); /*输出目录*/
            FileOutputFormat.setOutputPath(job, outputPath);
    
            job.waitForCompletion(true);
    
        }
    }
    

      

  • 相关阅读:
    python pytest全局用例共用之conftest.py详解
    mybatis mapper文件中select标签参数汇总
    mybatis整合redis实现二级缓存(转载)
    代码智能---aiXcoder插件
    mybatis运行原理及源码流程分析
    linux关闭防火墙
    mysql 锁
    mysql 性能低下的分析
    针对msyql的like中 两边都不得不使用% 的场景分析
    mysql 相关文件路径、配置
  • 原文地址:https://www.cnblogs.com/alpha-cat/p/12950335.html
Copyright © 2011-2022 走看看