zoukankan      html  css  js  c++  java
  • MapReduce Design Patterns(2. 中位数、方差)(三)

    http://blog.csdn.net/cuirong1986/article/details/8455335

    Median and standard deviation

    中值和标准差的计算比前面的例子复杂一点。因为这种运算是非关联的,它们不是那么容易的能从combiner中获益。中值是将数据集一分为两等份的数值类型,一份比中值大,一部分比中值小。这需要数据集按顺序完成清洗。数据必须是排序的,但存在一定障碍,因为MapReduce不会根据values排序。

     

    方差告诉我们数据跟平均值之间的差异程度。这就要求我们之前要先找到平均值。执行这种操作最容易的方法是复制值得列表到临时列表,以便找到中值,或者再一次迭代集合所有数据得到标准差。对大的数据量,这种实现可能导致java堆空间的问题,引文每个输入组的每个值都放进内存处理。下一个例子就是针对这种问题的。

     

    问题:给出用户评论,计算一天中每个小时评论长度的中值和标准差。

     

    Mapper codeMapper会处理每条输入记录计算一天内每个小时评论长度的中值(貌似事实不是这样)。输出键是小时,输出值是评论长度。

     

    public static class MedianStdDevMapper extends
            Mapper<Object, Text, IntWritable, IntWritable> {
    
        private IntWritable outHour = new IntWritable();
        private IntWritable outCommentLength = new IntWritable();
        private final static SimpleDateFormat frmt = new SimpleDateFormat(
                "yyyy-MM-dd'T'HH:mm:ss.SSS");
    
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
             Map<String, String> parsed = transformXmlToMap(value.toString());
             String strDate = parsed.get("CreationDate");
             String text = parsed.get("Text");
             Date creationDate = frmt.parse(strDate);
             outHour.set(creationDate.getHours());
             outCommentLength.set(text.length());
             context.write(outHour, outCommentLength);
        }
    }


    Reducer codeReducer会迭代给定值得集合,并把每个值加到内存列表里。同时也会计算一个动态的sumcount。迭代之后,评论长度被排序,以便找出中值。如果数量是偶数,中值是中间两个数的平均值。下面,根据动态的sumcount计算出平均值,然后迭代排序的列表计算出标准差。每个数跟平均值的差的平方累加求和保存在一个动态sum中,这个sum的平方根就是标准差。最后输出key,中值和标准差。

     

    public static class MedianStdDevReducer extends
            Reducer<IntWritable, IntWritable, IntWritable, MedianStdDevTuple> {
    
        private MedianStdDevTuple result = new MedianStdDevTuple();
        private ArrayList<Float> commentLengths = new ArrayList<Float>();
    
        public void reduce(IntWritable key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            float sum = 0;
            float count = 0;
            commentLengths.clear();
            result.setStdDev(0);
    // Iterate through all input values for this key
            for (IntWritable val : values) {
                commentLengths.add((float) val.get());
                sum += val.get();
                ++count;
            }
    // sort commentLengths to calculate median
            Collections.sort(commentLengths);
    // if commentLengths is an even value, average middle two elements
            if (count % 2 == 0) {
                result.setMedian((commentLengths.get((int) count / 2 - 1)
                        + commentLengths.get((int) count / 2)) / 2.0f);
            } else {
    // else, set median to middle value
                result.setMedian(commentLengths.get((int) count / 2));
            }
    // calculate standard deviation
            float mean = sum / count;
            float sumOfSquares = 0.0f;
            for (Float f : commentLengths) {
                sumOfSquares += (f - mean) * (f - mean);
            }
            result.setStdDev((float) Math.sqrt(sumOfSquares / (count - 1)));
            context.write(key, result);
        }
    }
    


    Combiner optimization。这种情况下不能用combinerreducer需要所有的值去计算中值和标准差。因为combiner仅仅在一个map本地处理中间键值对。计算完整的中值,和标准值是不可能的。下面的例子是一种复杂一点的使用自定义的combiner的实现。

     

    Memory-conscious median and standard deviation

    下面的例子跟前一个不同,并减少了内存的使用。把值放进列表会导致很多重复的元素。一种去重的方法是标记元素的个数。例如,对于列表< 1, 1, 1, 1, 2, 2, 3,4, 5, 5, 5 >,可以用一个sorted map保存:(14, 22, 31, 41, 53)。核心的原理是一样的:reduce阶段会迭代所有值并放入内存数据结构中。数据结构和搜索的方式是改变的地方。Map很大程度上减少了内存的使用。前一个例子使用list,复杂度为On),n是评论条数,本例使用map,使用键值对,为Omaxm)),m是评论长度的最大值。作为额外的补充,combiner的使用能帮助聚合评论长度的数目,并通过writable对象输出reducer端将要使用的这个map

     

    问题:同前一个。

     

    Mapper codeMapper处理输入记录,输出键是小时,值是sortedmapwritable对象,包含一个元素:评论长度和计数1.这个mapreducercombiner里多处用到。

     

    public static class MedianStdDevMapper extends
            Mapper<lObject, Text, IntWritable, SortedMapWritable> {
    
        private IntWritable commentLength = new IntWritable();
        private static final LongWritable ONE = new LongWritable(1);
        private IntWritable outHour = new IntWritable();
        private final static SimpleDateFormat frmt = new SimpleDateFormat(
                "yyyy-MM-dd'T'HH:mm:ss.SSS");
    
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            Map<String, String> parsed = transformXmlToMap(value.toString());
    // Grab the "CreationDate" field,
    // since it is what we are grouping by
            String strDate = parsed.get("CreationDate");
    // Grab the comment to find the length
            String text = parsed.get("Text");
    // Get the hour this comment was posted in
            Date creationDate = frmt.parse(strDate);
            outHour.set(creationDate.getHours());
            commentLength.set(text.length());
            SortedMapWritable outCommentLength = new SortedMapWritable();
            outCommentLength.put(commentLength, ONE);
    // Write out the user ID with min max dates and count
            context.write(outHour, outCommentLength);
        }
    }


     

    Reducer codeReducer通过迭代上面的map生成一个大的treemapkey是评论长度,value是这个长度的评论的数目。

     

    迭代以后,中值被计算出来。中值的索引由评论总数除以2得出。然后迭代treemapentrySet找到key,需满足条件为:previousCommentCount medianIndex < commentCount,把treeMap的值加到每一步迭代的评论里。一旦条件满足,如果有偶数条评论且中值索引等于前一条评论的,中值取前一个的长度和当前长度的平均值。否则,中值就是当前评论的长度。

     

    接下来,再一次迭代treemap,计算出平方和,确保相关联的评论长度和数目相乘。标准差就根据平方和算出来了。中值和标准差就随着key一块输出。

     

    public static class MedianStdDevReducer extends
            Reducer<IntWritable, SortedMapWritable, IntWritable, MedianStdDevTuple> {
    
        private MedianStdDevTuple result = new MedianStdDevTuple();
        private TreeMap<Integer, Long> commentLengthCounts
                = new TreeMap<Integer, Long>();
    
        public void reduce(IntWritable key, Iterable<SortedMapWritable> values,
                Context context) throws IOException, InterruptedException {
            float sum = 0;
            long totalComments = 0;
            commentLengthCounts.clear();
            result.setMedian(0);
            result.setStdDev(0);
            for (SortedMapWritable v : values) {
                for (Map.Entry<WritableComparable, Writable> entry : v.entrySet()) {
                    int length = ((IntWritable) entry.getKey()).get();
                    long count = ((LongWritable) entry.getValue()).get();
                    totalComments += count;
                    sum += length * count;
                    Long storedCount = commentLengthCounts.get(length);
                    if (storedCount == null) {
                        commentLengthCounts.put(length, count);
                    } else {
                        commentLengthCounts.put(length, storedCount + count);
                    }
                }
            }
            long medianIndex = totalComments / 2L;
            long previousComments = 0;
            long comments = 0;
            int prevKey = 0;
            for (Map.Entry<Integer, Long> entry : commentLengthCounts.entrySet()) {
                comments = previousComments + entry.getValue();
                if (previousComments <= medianIndex && medianIndex < comments) {
                    if (totalComments % 2 == 0 && previousComments == medianIndex) {
                        result.setMedian((float) (entry.getKey() + prevKey) / 2.0f);
                    } else {
                        result.setMedian(entry.getKey());
                    }
                    break;
                }
                previousComments = comments;
                prevKey = entry.getKey();
            }
    // calculate standard deviation
            float mean = sum / totalComments;
            float sumOfSquares = 0.0f;
            for (Map.Entry<Integer, Long> entry : commentLengthCounts.entrySet()) {
                sumOfSquares += (entry.getKey() - mean) * (entry.getKey() - mean)
                        * entry.getValue();
            }
            result.setStdDev((float) Math.sqrt(sumOfSquares / (totalComments - 1)));
            context.write(key, result);
        }
    }
    

     

    Combiner optimization。跟前面的例子不同,这里combiner的逻辑跟reducer不同。Reducer计算中值和标准差,而combiner对每个本地map的中间键值对聚合sortedMapWritable条目。代码解析这些条目并在本地map聚合它们,这跟前面部分的reducer代码是相同的。这里用一个hashmap替换treemap,因为不需要排序,且hashmap更快。Reducer使用map计算中值和标准差,而combiner是用sortedMapWritable序列化为reduce阶段做准备。

     

    public static class MedianStdDevCombiner extends
            Reducer<IntWritable, SortedMapWritable, IntWritable, SortedMapWritable> {
    
        protected void reduce(IntWritable key,
                Iterable<SortedMapWritable> values, Context context)
                throws IOException, InterruptedException {
            SortedMapWritable outValue = new SortedMapWritable();
            for (SortedMapWritable v : values) {
                for (Map.Entry<WritableComparable, Writable> entry : v.entrySet()) {
                    LongWritable count = (LongWritable) outValue.get(entry.getKey());
                    if (count != null) {
                        count.set(count.get()
                                + ((LongWritable) entry.getValue()).get());
                    } else {
                        outValue.put(entry.getKey(), new LongWritable(
                                ((LongWritable) entry.getValue()).get()));
                    }
                }
            }
            context.write(key, outValue);
        }
    }
    


    Data flow diagram。图2-4展示了例子的数据流程图

     

    Figure 2-4. Data flow for the standard deviation example


  • 相关阅读:
    认识一下JavaScript
    JAVA并发容器之CopyOnWrite容器
    JAVA并发容器之ConcurrentHashMap
    由浅入深理解java集合(一)——集合框架 Collection、Map
    强引用、软引用、弱引用、虚引用
    Lock和synchronized的选择
    Java并发编程:volatile关键字解析
    java线程并发-Thread类的使用
    SQL语句中:UNION与UNION ALL的区别
    抽象类与接口
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276270.html
Copyright © 2011-2022 走看看