zoukankan      html  css  js  c++  java
  • 从groupby 理解mapper-reducer

    注,reduce之前已经shuff。

    http://zheming.wang/blog/2015/05/19/3AFF5BE8-593C-4F76-A72A-6A40FB140D4D/

    简单地说,reduce任务在执行之前的工作就是不断地拉取每个map任务的最终结果,然后对从不同地方拉取过来的数据不断地做merge,也最终形成一个文件作为reduce任务的输入文件。下图reducer部分可能会有误导...

    mapper.py

    #!/usr/bin/env python
    """mapper.py"""
    
    import sys
    
    # input comes from STDIN (standard input)
    for line in sys.stdin:
        # remove leading and trailing whitespace
        line = line.strip()
        # split the line into words
        words = line.split()
        # increase counters
        for word in words:
            # write the results to STDOUT (standard output);
            # what we output here will be the input for the
            # Reduce step, i.e. the input for reducer.py
            #
            # tab-delimited; the trivial word count is 1
            print '%s	%s' % (word, 1)

    reducer.py   

    #!/usr/bin/env python
    """reducer.py"""
    
    from operator import itemgetter
    import sys
    
    current_word = None
    current_count = 0
    word = None
    
    # input comes from STDIN
    for line in sys.stdin:
        # remove leading and trailing whitespace
        line = line.strip()
    
        # parse the input we got from mapper.py
        word, count = line.split('	', 1)
    
        # convert count (currently a string) to int
        try:
            count = int(count)
        except ValueError:
            # count was not a number, so silently
            # ignore/discard this line
            continue
    
        # this IF-switch only works because Hadoop sorts map output
        # by key (here: word) before it is passed to the reducer
        if current_word == word:
            current_count += count
        else:
            if current_word:
                # write result to STDOUT
                print '%s	%s' % (current_word, current_count)
            current_count = count
            current_word = word
    
    # do not forget to output the last word if needed!
    if current_word == word:
        print '%s	%s' % (current_word, current_count)

    Improved Mapper and Reducer code: using Python iterators and generators

    mapper.py

    #!/usr/bin/env python
    """A more advanced Mapper, using Python iterators and generators."""
    
    import sys
    
    def read_input(file):
        for line in file:
            # split the line into words
            yield line.split()
    
    def main(separator='	'):
        # input comes from STDIN (standard input)
        data = read_input(sys.stdin)
        for words in data:
            # write the results to STDOUT (standard output);
            # what we output here will be the input for the
            # Reduce step, i.e. the input for reducer.py
            #
            # tab-delimited; the trivial word count is 1
            for word in words:
                print '%s%s%d' % (word, separator, 1)
    
    if __name__ == "__main__":
        main()

    reducer.py   

    #!/usr/bin/env python
    """A more advanced Reducer, using Python iterators and generators."""
    
    from itertools import groupby
    from operator import itemgetter
    import sys
    
    def read_mapper_output(file, separator='	'):
        for line in file:
            yield line.rstrip().split(separator, 1)
    
    def main(separator='	'):
        # input comes from STDIN (standard input)
        data = read_mapper_output(sys.stdin, separator=separator)
        # groupby groups multiple word-count pairs by word,
        # and creates an iterator that returns consecutive keys and their group:
        #   current_word - string containing a word (the key)
        #   group - iterator yielding all ["<current_word>", "<count>"] items
        for current_word, group in groupby(data, itemgetter(0)):
            try:
                total_count = sum(int(count) for current_word, count in group)
                print "%s%s%d" % (current_word, separator, total_count)
            except ValueError:
                # count was not a number, so silently discard this item
                pass
    
    if __name__ == "__main__":
        main()
  • 相关阅读:
    整合springmvc+spring+mybatis
    springmvc 登录拦截器
    Python3.6.5 Win10安装numpy,scipy,scikit-learn,matplotlib
    windows+mysql+python+navicat入坑指南
    TP5 paginate()分页后给结果集追加字段和数据
    cropper+php+ajax 上传头像
    PHP base64转换成图片
    Echarts通过Ajax实现动态数据加载
    用php获取当天年份、月份、日及天数
    jquery对json 键值对或数组的增加、删除、遍历操作
  • 原文地址:https://www.cnblogs.com/TMatrix52/p/11619614.html
Copyright © 2011-2022 走看看