zoukankan      html  css  js  c++  java
  • 分布式统计的思考以及实现

    分布式统计的思考以及实现

    在展开描述之前,先看个简单的例子,假设现有这样一组数据

    Index A B C
    0 a1 b1 c1
    1 a2 b1 c2
    2 a1 b1 c3

    需求为这样:

    以A,B作为分组字段,对C去重后求和

    那么,针对上述的数据源,则结果表为:

    A B Sum_C
    a1 b1 2 #c1, c3
    a2 b1 1 #c2

    计算过程大概为这样:

    # 0号数据进入,取出C值c1

    #1号数据,A和B的组合与0号不匹配,则生成新的组

    #2号进入,与0号匹配,且C值不重复,总和为2

    上述的计算过程我们通常会在各种数据库中见到,例如MS SQLServer或者Mongodb等等,在数据库中的计算都有明显的缺陷:

    * 单点式

    * 无法做实时计算

    而且对于mongodb来说(很久没接触关系型数据库了,就不献丑了),数据量的增大以及数据表的增加对于性能是一个非常大的影响,对内存的需求会非常之高,从成本以及性能的角度考虑,我们需要一个可分布式的算法以及实现过程

    那么,我们再来回顾刚才的计算过程:

    * 对A、B字段的组合分组可以看作一个计算hash的操作

    * 对C字段的去重求和也可以看作一个大的hashSet去重的操作

    * 对于新的数据进入,重复计算hash的过程

    OK,除了计算hash的过程,还应该有存储hash值的设施,很显然,redis最为合适

    那么,如何实现呢? ( 以下以python作为实现语言)

    我们知道,在python以及js这种语言中,可以很方便的用dict表示一条数据记录,例如:

    1
    {'A': 'a1', 'B': 'b1', 'C': 'c1'}

    那么,所有的记录操作都是针对dict对象进行的,以下将给出一段代码片段,第二部分将对实现过程做具体的描述

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    def __do_aggerate(self, _2nd_k, op, _1st_k = None):
        assert callable(op)
         
        _ = self.__aggerates.get(_2nd_k)
        if _:
            return _
         
        _1st_k = self.__id if _1st_k is None else _1st_k
        try:
            self.__r.watch(_1st_k)
            _ = op(_1st_k, _2nd_k)
        except WatchError, e:
            log.fatal('transaction fail: {0}'.format(_1st_k))
        finally:
            self.__r.unwatch()
             
        self.__aggerates.update({_2nd_k: _})
        return _
         
         
    def __cal_grpkey(self, src):
        '''计算分组对应的key
        '''
        grp_key = {}
        for f in self.__groupby_fields:
            ok, value = self.sf_parser.unwind(f, src)
            if not ok:
                return False, None
            grp_key.update({f: value})
             
        return True, grp_key
     
     
    def group_distinct_sum(self, src, *unique_fields):
        assert src and isinstance(src, dict), src
         
        ok, grp_key = self.__cal_grpkey(src)
        if not ok:
            return ok, None
         
        r_key = grp_key.copy()
        for u in unique_fields:
            _ = src.get(u)
            if _ is None:
                return False, None
            r_key.update({u: _})
             
        def __(h, k):
            self.__r.hset(h, k, 1)
            #self.__r即redis对象
            return self.__r.hlen(h)
             
        h_key = '_u:{0}:{1}:{2}'.format(self.__id,
            ':'.join(unique_fields),
            ':'.join((self.__safe_str(v) for v in grp_key.values())))
        u_key = hashlib.md5(cPickle.dumps(r_key)).hexdigest()
         
        _ = self.__do_aggerate(u_key, __, h_key)
        return True, (grp_key, _)
    1
     

    如上代码即完成了上文描述的操作:

    * 计算分组字段的值

    * 对多个分组字段计算hash

    * 对聚集字段(即文章开始的C)作求和操作,调用redis对象的hset和hlen完成求和过程

    更详细的,完整的实现过程将在第二部分中阐述

    在展开描述之前,先看个简单的例子,假设现有这样一组数据

    Index A B C
    0 a1 b1 c1
    1 a2 b1 c2
    2 a1 b1 c3

    需求为这样:

    以A,B作为分组字段,对C去重后求和

    那么,针对上述的数据源,则结果表为:

    A B Sum_C
    a1 b1 2 #c1, c3
    a2 b1 1 #c2

    计算过程大概为这样:

    # 0号数据进入,取出C值c1

    #1号数据,A和B的组合与0号不匹配,则生成新的组

    #2号进入,与0号匹配,且C值不重复,总和为2

    上述的计算过程我们通常会在各种数据库中见到,例如MS SQLServer或者Mongodb等等,在数据库中的计算都有明显的缺陷:

    * 单点式

    * 无法做实时计算

    而且对于mongodb来说(很久没接触关系型数据库了,就不献丑了),数据量的增大以及数据表的增加对于性能是一个非常大的影响,对内存的需求会非常之高,从成本以及性能的角度考虑,我们需要一个可分布式的算法以及实现过程

    那么,我们再来回顾刚才的计算过程:

    * 对A、B字段的组合分组可以看作一个计算hash的操作

    * 对C字段的去重求和也可以看作一个大的hashSet去重的操作

    * 对于新的数据进入,重复计算hash的过程

    OK,除了计算hash的过程,还应该有存储hash值的设施,很显然,redis最为合适

    那么,如何实现呢? ( 以下以python作为实现语言)

    我们知道,在python以及js这种语言中,可以很方便的用dict表示一条数据记录,例如:

    1
    {'A': 'a1', 'B': 'b1', 'C': 'c1'}

    那么,所有的记录操作都是针对dict对象进行的,以下将给出一段代码片段,第二部分将对实现过程做具体的描述

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    def __do_aggerate(self, _2nd_k, op, _1st_k = None):
        assert callable(op)
         
        _ = self.__aggerates.get(_2nd_k)
        if _:
            return _
         
        _1st_k = self.__id if _1st_k is None else _1st_k
        try:
            self.__r.watch(_1st_k)
            _ = op(_1st_k, _2nd_k)
        except WatchError, e:
            log.fatal('transaction fail: {0}'.format(_1st_k))
        finally:
            self.__r.unwatch()
             
        self.__aggerates.update({_2nd_k: _})
        return _
         
         
    def __cal_grpkey(self, src):
        '''计算分组对应的key
        '''
        grp_key = {}
        for f in self.__groupby_fields:
            ok, value = self.sf_parser.unwind(f, src)
            if not ok:
                return False, None
            grp_key.update({f: value})
             
        return True, grp_key
     
     
    def group_distinct_sum(self, src, *unique_fields):
        assert src and isinstance(src, dict), src
         
        ok, grp_key = self.__cal_grpkey(src)
        if not ok:
            return ok, None
         
        r_key = grp_key.copy()
        for u in unique_fields:
            _ = src.get(u)
            if _ is None:
                return False, None
            r_key.update({u: _})
             
        def __(h, k):
            self.__r.hset(h, k, 1)
            #self.__r即redis对象
            return self.__r.hlen(h)
             
        h_key = '_u:{0}:{1}:{2}'.format(self.__id,
            ':'.join(unique_fields),
            ':'.join((self.__safe_str(v) for v in grp_key.values())))
        u_key = hashlib.md5(cPickle.dumps(r_key)).hexdigest()
         
        _ = self.__do_aggerate(u_key, __, h_key)
        return True, (grp_key, _)
    1
     

    如上代码即完成了上文描述的操作:

    * 计算分组字段的值

    * 对多个分组字段计算hash

    * 对聚集字段(即文章开始的C)作求和操作,调用redis对象的hset和hlen完成求和过程

    更详细的,完整的实现过程将在第二部分中阐述

     
     
    分类: Python
     
  • 相关阅读:
    Codeforces Round #417 C. Sagheer and Nubian Market
    linux 终端抓包命令
    计算机网络体系结构分析
    排序算法-快速排序
    排序算法-堆排序
    排序算法-希尔排序
    排序算法-插入排序
    排序算法-冒泡排序
    排序算法-选择排序
    杂谈:终端小工具
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/3035553.html
Copyright © 2011-2022 走看看