zoukankan      html  css  js  c++  java
  • hadoop mapreduce求解有序TopN(高效模式)

    1、在map阶段对数据先求解改分片的topN,到reduce阶段再合并求解一次,求解过程利用TreeMap的排序特性,不用自己写算法。

    2、样板数据,类似如下

    1 	13682846555	192.168.100.12	www.qq.com	1938	2910	200

    3、code

    3.1 mapper

    public class TopNMapper extends Mapper<LongWritable, Text, FlowBeanSorted,Text> {
        // 定义一个TreeMap作为存储数据的容器(天然按key排序)
        private TreeMap<FlowBeanSorted, Text> flowMap = new TreeMap<>();
        private enum Counters {LINES}
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            context.getCounter(Counters.LINES).increment(1);
            String lines = value.toString();
            String[] fields = lines.split("\s+");
            String phoneNumber = fields[1];
            long upFlow = Long.parseLong(fields[fields.length-3]);
            long downFlow = Long.parseLong(fields[fields.length-2]);
    
            FlowBeanSorted k = new FlowBeanSorted();
            Text v = new Text();
    
            k.setAll(upFlow,downFlow);
            v.set(phoneNumber);
    
            flowMap.put(k,v);
    
            //限制TreeMap的数据量,超过10条就删除掉流量最小的一条数据
            if (flowMap.size() > 10) {
    //        flowMap.remove(flowMap.firstKey());
                flowMap.remove(flowMap.lastKey());
            }
    
        }
    
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            Iterator<FlowBeanSorted> bean = flowMap.keySet().iterator();
    
            while (bean.hasNext()) {
    
                FlowBeanSorted k = bean.next();
    
                context.write(k, flowMap.get(k));
            }
    
        }
    }

    3.2 reducer

    public class TopNReducer extends Reducer<FlowBeanSorted, Text,Text,FlowBeanSorted> {
        // 定义一个TreeMap作为存储数据的容器(天然按key排序)
        TreeMap<FlowBeanSorted, Text> flowMap = new TreeMap<>();
    
        @Override
        protected void reduce(FlowBeanSorted key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            for (Text value : values) {
    
                FlowBeanSorted bean = new FlowBeanSorted();
                bean.setAll(key.getUpFlow(),key.getDownFlow());
    
                // 1 向treeMap集合中添加数据
                flowMap.put(bean, new Text(value));
    
                // 2 限制TreeMap数据量,超过10条就删除掉流量最小的一条数据
                if (flowMap.size() > 10) {
                    // flowMap.remove(flowMap.firstKey());
                    flowMap.remove(flowMap.lastKey());
                }
            }
    
        }
    
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
    //        遍历集合,输出数据
            Iterator<FlowBeanSorted> it = flowMap.keySet().iterator();
    
            while (it.hasNext()) {
    
                FlowBeanSorted v = it.next();
    
                context.write(new Text(flowMap.get(v)), v);
            }
    
        }
    }

    3.3 driver

    public class TopNDriver {
        public static void main(String[] args) throws Exception {
    
            args  = new String[]{"input/phone*.txt","output/"};
    
            //获取配置信息,或者job对象实例
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
    
            //指定本程序的jar包所在的本地路径
            job.setJarByClass(TopNDriver.class);
    
            //指定本业务job要使用的mapper/Reducer业务类
            job.setMapperClass(TopNMapper.class);
            job.setReducerClass(TopNReducer.class);
    
            //指定mapper输出数据的kv类型
            job.setMapOutputKeyClass(FlowBeanSorted.class);
            job.setMapOutputValueClass(Text.class);
    
            //指定最终输出的数据的kv类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBeanSorted.class);
    
            //指定job的输入原始文件所在目录
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            Path outPath = new Path(args[1]);
            FileSystem fs = FileSystem.get(configuration);
            if(fs.exists(outPath)){
                fs.delete(outPath,true);
            }
    
            //将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    
    }
     
  • 相关阅读:
    【664】日常记录
    【663】dataframe 删掉指定行或者列
    【662】TensorFlow GPU 相关配置
    【661】Python split 指定多个分隔符
    【660】TensorFlow 或者 keras 版本问题
    FFMPEG视音频编解码
    cpplint中filter参数
    升级pip之后出现sys.stderr.write(f“ERROR: {exc}“)
    特征点三角化恢复3D点
    VIO——陀螺仪零偏估计
  • 原文地址:https://www.cnblogs.com/asker009/p/11458823.html
Copyright © 2011-2022 走看看