zoukankan      html  css  js  c++  java
  • MapReduce Design Patterns(3. Top Ten))(六)

    http://blog.csdn.net/cuirong1986/article/details/8469448


    Top Ten

    Pattern Description

    Top ten模式跟前面的有很大的不同,跟输入数据大小无关,最终得到的记录数量是确定的。而在通用filtering中,输出的规模取决于输入数据。

    Intent

    根据数据集的排名,获取相对较小的前K条记录,不管数据量多大。

    Motivation

    在数据分析中,找出离群值是很重要的工作,因为这些记录是典型的最引人关注的独特的数据片。这种模式的关键点是根据指定的规则找到最具代表性的记录,根据这些记录,可能就会找出导致这些记录特殊的原因。如果定义了能决定两条记录之间排名的排名方法或比较方法,就能用这种模式,用MapReduce找出整个数据集的排名最高的值。

     

    这种模式特别引人关注的原因是,比较器是在MapReduce上下文之外实现的。在hql中,你可能更倾向于用排名值排序数据,然后取top k。在MapReduce中,正如下一章要讲的,全局排序是非常复杂的,会使用集群大量的资源。这种模式着眼于不通过排序而去取到有限的high-value 记录。

     

    找出top ten 是一件有趣的事情。Stackoverflow上哪些发帖得分最高?谁是最老的会员?你的网站最大的订单是?哪个中帖子“meow”出现次数最多?

    Applicability

    ·这种模式需要一个能用于两条记录的比较方法。就是说,我们必须能够拿一条记录和另一条记录比较来决定那一条更大。

    ·输出记录的数量应该明显比输入少,其中一个重要原因是,这样应该比全局排序更有意义。

    Structure

    这种模式mapprer reducer都有。Mapper找到本地top K,然后都发送到reducer来找出最终的top K。因为mapper输出的记录最多K条,相对较小,所以仅需要一个reducer。可以看图3-3.

     

     

    class mapper:

    setup():

    initialize top ten sorted list

    map(key, record):

    insert record into top ten sorted list

    if length of array is greater-than 10 then

        truncate list to a length of 10

    cleanup():

    for record in top sorted ten list:

        emit null,record


    class reducer:

    setup():

    initialize top ten sorted list

    reduce(key, records):

    sort records

    truncate records to top 10

    for record in records:

         emit record

    Figure 3-3. The structure of the top ten pattern

    Mapper读每条输入记录,用一个大小为K的数组对象收集值。在mappercleanup阶段,把存在数组里的K条记录作为值,keynull,发送到reducer。这是map任务最低需求的K

     

    我们会在reducer得到K*M条记录,M代表map任务数。在reduce方法,做跟mapper同样的事情。

    我们在每个mapper中要选出top k是要考虑最极端的情况。

    Consequences

    Top k条记录被返回。

    Known uses

    Outlier analysis

    离群值通常需要关注。可能是用户使用你的系统很困难造成的,或者网站的高级用户。用过滤和分组,也可能给你另一种数据集的视图。

    Select interesting data

    如果你能根据某种排序给记录评分,就能找出最有价值的数据。如果你打算跟踪后续处理过程,这就非常有用。例如BI工具或RDB,不能处理大规模的数据。评分规则可以用一些高级的算法设置的很复杂,例如给文本评分,根据语法和拼写的精确度,以此达到删除垃圾数据的目的。

    Catchy dashboards

    这不是一本心理学的书,所以你认为消费者感兴趣的top ten数据,他们就是。这种模式也可以用于网站的一些有趣的top ten的统计,并且可能让用户对数据有更多的思考,或者甚至带来竞争。

    Resemblances

    Sql

    在传统的小的RDB中,排序可能不算什么。这种情况下,可以根据排序的条件获得top tenMapReduce中也可以做同样的事情,但你会在后面的模式中看到,排序是一种代价很高的操作。

    SELECT FROM table ORDER BY col4 DESC LIMIT 10;

     

    Pig

    Pig无论用任何最优的排序,在执行这种查询时会有一些问题。最简单的样式跟sql 查询一样,但仅仅为了取得几条记录,排序代价高。改进这种情形的方法是用java MapReduce代替pig

    B = ORDER A BY col4 DESC;

    C = LIMIT B 10;

    Performance analysis

    Top ten模式的性能是很好的,但有几个重要的局限性。大多数局限性来自于单reducer,不管处理的数据量的大小。

     

    使用这种模式时,我们需要关注的是reducer会接收到多少数据。每个map任务输出K条记录,job会由Mmap任务组成,所以reducer要处理M*K条记录,这可能有点多。

    reducer处理大量数据是不合适的,原因如下:

    ·记录很多时排序代价高,需要用到本地磁盘做大部分的排序,而不是在内存里。

    ·Reducer运行的主机会通过网络接收大量的数据,会造成网络资源热点。

    ·如果有太多的记录要处理,reduce scan数据花费很长的时间。

    ·reducer中排序需要的内存的增长可能导致jvmOOM。例如,在中间过程中把所有的值收集到arrayList里,会导致很大。对于top ten问题,这可能不是特殊的问题,但如果数量太大就会超出内存的限制。

    ·写到输出文件不是并行的。Reduce阶段往本地磁盘写数据是代价高的操作。如果只有一个reducer,就不能利用往多个主机写数据的并行优势,或相同主机上的不同磁盘。这不是top ten本身的问题,但会在数据很大时变成一种制约因素。

     

    如果K值很大,这种模式变得低效。考虑一种极端情况,K被设为五百万,整个数据集为十百万。500万超过了输入分片的大小,所以每个mapper会发送所有的数据到reducerReducer会处理整个数据集,问题是数据到reducer不会并行加载。

     

    一种优化方式是过滤掉一些数据,当你知道某些规则时。设想数据中有个值是只增长,你想找到top 100条记录。第100条记录这个值是52485,然后你就可以过滤掉比这个值小的记录。

     

    基于以上原因,这种模式只适用于K值较小的情况。最多几十,几百。做全局排序是否看起来更有效,这个K的界限是模糊的。

    Top Ten Examples

    Top ten users by reputation

    MapReduce计算top ten记录是有意义的实践。每个mapper决定各自分片的top ten并输出到reducer阶段,来计算最终的top ten。记得配置你的job1reducer

    问题:给出用户信息数据,根据用户的声誉值输出top ten

     

    Mapper codeMapper处理所有输入记录存在treeMap里。TreeMap是按key排序的。Integer的默认排序时升序。如果treeMap超过10个记录,第一条就被移除。所有记录处理后,在cleanup方法里输出给reducer

     

    public static class TopTenMapper extends
            Mapper<Object, Text, NullWritable, Text> {
    // Stores a map of user reputation to the record
    
        private TreeMap<Integer, Text> repToRecordMap = new TreeMap<Integer, Text>();
    
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            Map<String, String> parsed = transformXmlToMap(value.toString());
            String userId = parsed.get("Id");
            String reputation = parsed.get("Reputation");
    // Add this record to our map with the reputation as the key
            repToRecordMap.put(Integer.parseInt(reputation), new Text(value));
    // If we have more than ten records, remove the one with the lowest rep
    // As this tree map is sorted in descending order, the user with
    // the lowest reputation is the last key.
            if (repToRecordMap.size() > 10) {
                repToRecordMap.remove(repToRecordMap.firstKey());
            }
        }
    
        protected void cleanup(Context context) throws IOException {
        }
    }


     

    reducer code。整体上,reducer计算top ten的方式跟mapper相同。使用job.setNumReduceTasks(1),来配置jobreducer1个,并使用nullwritable作为key,这将在reducer只产生一个输入组,包含了所有的top ten 记录。计算完后,把降序排序的结果刷到文件系统中。由于只有一个输入组,所以这一步可以reduce方法完成,也可以在cleanup里做。

     

    public static class TopTenReducer extends
            Reducer<NullWritable, Text, NullWritable, Text> {
    // Stores a map of user reputation to the record
    // Overloads the comparator to order the reputations in descending order
    
        private TreeMap<Integer, Text> repToRecordMap = new TreeMap<Integer, Text>();
    
        public void reduce(NullWritable key, Iterable<Text> values,
                Context context) throws IOException, InterruptedException {
            for (Text value : values) {
                Map<String, String> parsed = transformXmlToMap(value.toString());
                repToRecordMap.put(Integer.parseInt(parsed.get("Reputation")),
                        new Text(value));
    // If we have more than ten records, remove the one with the lowest rep
    // As this tree map is sorted in descending order, the user with
    // the lowest reputation is the last key.
                if (repToRecordMap.size() > 10) {
                    repToRecordMap.remove(repToRecordMap.firstKey());
                }
            }
            for (Text t : repToRecordMap.descendingMap().values()) {
    // Output our ten records to the file system with a null key
                context.write(NullWritable.get(), t);
            }
        }
    }
    



    Notice:这个job也不需要combiner,虽然从技术上reduce代码可以用作combiner。但是多余的处理。这段代码是找出top ten的固定的代码,但很容易通过在setup方法捕获变量的方式改成求top K记录的程序。只要保证K满足前面讨论的问题,别太大。

     

     

    Distinct

    Pattern Description

    这种模式过滤整个数据集,在处理相似的记录时面临挑战。最终的输出是剔重后的所有记录。

    Intent

    在包含很多相似记录的数据中做剔重。

    Motivation

    减少数据集使其成为一个有唯一值得集合有几种用法。一种是剔重。在大数据集里,重复或极其相似的记录会带来很麻烦的问题。重复记录占用大量的空间或倾斜top-level分析的结果。例如,每次某人访问你的网站,你收集他所用的浏览器和设备来进行营销分析。如果用户访问次数超过一次,你会记录多次。如果你要计算各种浏览器在用户中使用所占的百分比,用户的访问次数会扭曲真实的结果。因此,首先要对数据去重,保证一台设备一个日志事件一条记录。

     

    记录没有必要再原生形式上精确的认为相同。只需要翻译成一种能认为相同的形式即可认为相同。例如,根据http server 日志做web浏览分析,抽取用户名,设备,和用户使用的浏览器。我们并不关心浏览时间,或者来自哪台http server

    Applicability

    只需要你的数据集有重复的值的数据。当然不是必须的,如果本身没有重复值,傻瓜才会用这种模式。

    Structure

    这种模式在MapReduce的使用上显得非常平滑。利用MapReduce能把key按分组聚到一起的功能来删除重复数据。Mapper转换数据,reducer做的工作也不多。如果重复数据太多,也可以考虑使用combiner。重复的记录经常挨着,所以combiner可以在map阶段去重。

    map(key, record):

    emit record,null

    reduce(key, records):

    emit key

     

    mapper在每个分片上去重,去重的原则只根据那三个字段。然后输出到reducer,记录作为keynull作为value

    Reducer根据key分组valuevaluenull,不分析。因为是按key分组的,所以只需要简单的输出key就可保证结果唯一。

    这种模式一个好的特性是reducer的数量按计算量自己决定,设置reducers的数量相对较多一点,因为mapper几乎把所有数据都发送到reducer

    Notice:这是调整数据文件大小的最好时机。想让输出文件大一点,就减少reduce个数,反之一样。不管怎样输出文件的大小是相同的,归功于partitioner的随机hashing

    Consequences

    输出记录保证是唯一的,没使用任何相关命令是因为使用了框架默认配置。

    Known uses

    Deduplicate data

    如果你从几个数据源采集数据,并出现相同的事件存在两次,你就能用这种模式去重。

    Getting distinct values

    如果你的原生记录可能不存在重复值,但提取出来的信息可能有重复的。也可以使用。

    Protecting from an inner join explosion

    如果你要做两个数据集之间的inner join,外键还不是唯一的,后果可能会获取巨大数量的记录。例如,在一个数据集有3000个相同的key,另一个数据集有2000个也是这个相同的key,最后你会得到600000条记录,都会发送到reducer。在用这种模式时,要处理连接key保证它们唯一来减轻这个问题。

    Resemblances

    SqlSELECT DISTINCT FROM table;

    Pigb = DISTINCT a;

    Performance analysis

    理解这种模式的性能分析对高效的使用很重要。主要考虑的问题是如何设置MapReduce job reduce的个数。这主要看mapper输出的记录条数和字节大小。这也能看出combiner能剔除多少数据。如果在一个分片内重复值很少(combiner不会有太大的作用),大多数数据都会发到reduce阶段。

    当一个程序在跑时,通过jobTrackerjob的信息可以看到输出的字节数和记录数。可以用字节数除以reducer的个数计算相应值。这个值决定了每个reducer接受的字节大小,倾斜情况不计算。一个reducer能处理的数据量依不同的调度而不同,但通常不要超过几百兆。当然也不能太小,出现过多的小文件,并不会加速reduce的执行。这个数据量一般要比分片大。

    由于所有的数据都要发送到reduceer,要使用相对较多的reducer个数运行job。不管是一个reducer对应100mapper,还是一个reducer对应2mapperjob都会完成。要理论联系实践找到最佳的reducer个数。通常,要想reducer时间减半,就double reducer的数量,但要小心文件不要太小。

    Notice:选择job reducer数量时,要考虑到集群配置的slots数量。开始用distinct模式可以在数据量合理时设置为跟slots数量相近,数据量大时配成两倍slots数量。

    Distinct Examples

    Distinct user IDs

    去重集合数据是一个展示MapReduce性能的很好的例子。因为每个reducer面对一个唯一key和与其相关的值得集合,为了去重,只需要输出key

     

    问题:给出用户评论数据,根据用户id去重。

    Mapper codeMapper从每条输入记录拿到用户id,作为keyvaluenull,输出到reducer

     

    public static class DistinctUserMapper extends
            Mapper<Object, Text, Text, NullWritable> {
    
        private Text outUserId = new Text();
    
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            Map<String, String> parsed = transformXmlToMap(value.toString());
    // Get the value for the UserId attribute
            String userId = parsed.get("UserId");
    // Set our output key to the user's id
            outUserId.set(userId);
    // Write the user's id with a null value
            context.write(outUserId, NullWritable.get());
        }
    }


     

    Reducer code。大量的体力活都由MapReduce框架来完成。

     

    public static class DistinctUserReducer extends
            Reducer<Text, NullWritable, Text, NullWritable> {
    
        public void reduce(Text key, Iterable<NullWritable> values,
                Context context) throws IOException, InterruptedException {
    // Write the user's id with a null value
            context.write(key, NullWritable.get());
        }
    }

    Combiner optimizationCombiner可以使用,现在map本地去重,减少网络ioreduce代码可用于combiner


  • 相关阅读:
    Serialization and deserialization are bottlenecks in parallel and distributed computing, especially in machine learning applications with large objects and large quantities of data.
    Introduction to the Standard Directory Layout
    import 原理 及 导入 自定义、第三方 包
    403 'Forbidden'
    https://bitbucket.org/ariya/phantomjs/downloads/phantomjs-2.1.1-linux-x86_64.tar.bz2
    These interactions can be expressed as complicated, large scale graphs. Mining data requires a distributed data processing engine
    mysqldump --flush-logs
    mysql dump 参数
    mysql dump 参数
    如果是在有master上开启了该参数,记得在slave端也要开启这个参数(salve需要stop后再重新start),否则在master上创建函数会导致replaction中断。
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276263.html
Copyright © 2011-2022 走看看