分布式统计的思考以及实现
在展开描述之前,先看个简单的例子,假设现有这样一组数据
Index | A | B | C |
0 | a1 | b1 | c1 |
1 | a2 | b1 | c2 |
2 | a1 | b1 | c3 |
需求为这样:
以A,B作为分组字段,对C去重后求和
那么,针对上述的数据源,则结果表为:
A | B | Sum_C |
a1 | b1 | 2 #c1, c3 |
a2 | b1 | 1 #c2 |
计算过程大概为这样:
# 0号数据进入,取出C值c1
#1号数据,A和B的组合与0号不匹配,则生成新的组
#2号进入,与0号匹配,且C值不重复,总和为2
上述的计算过程我们通常会在各种数据库中见到,例如MS SQLServer或者Mongodb等等,在数据库中的计算都有明显的缺陷:
* 单点式
* 无法做实时计算
而且对于mongodb来说(很久没接触关系型数据库了,就不献丑了),数据量的增大以及数据表的增加对于性能是一个非常大的影响,对内存的需求会非常之高,从成本以及性能的角度考虑,我们需要一个可分布式的算法以及实现过程
那么,我们再来回顾刚才的计算过程:
* 对A、B字段的组合分组可以看作一个计算hash的操作
* 对C字段的去重求和也可以看作一个大的hashSet去重的操作
* 对于新的数据进入,重复计算hash的过程
OK,除了计算hash的过程,还应该有存储hash值的设施,很显然,redis最为合适
那么,如何实现呢? ( 以下以python作为实现语言)
我们知道,在python以及js这种语言中,可以很方便的用dict表示一条数据记录,例如:
1
|
{ 'A' : 'a1' , 'B' : 'b1' , 'C' : 'c1' } |
那么,所有的记录操作都是针对dict对象进行的,以下将给出一段代码片段,第二部分将对实现过程做具体的描述
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
def __do_aggerate( self , _2nd_k, op, _1st_k = None ): assert callable (op) _ = self .__aggerates.get(_2nd_k) if _: return _ _1st_k = self .__id if _1st_k is None else _1st_k try : self .__r.watch(_1st_k) _ = op(_1st_k, _2nd_k) except WatchError, e: log.fatal( 'transaction fail: {0}' . format (_1st_k)) finally : self .__r.unwatch() self .__aggerates.update({_2nd_k: _}) return _ def __cal_grpkey( self , src): '''计算分组对应的key ''' grp_key = {} for f in self .__groupby_fields: ok, value = self .sf_parser.unwind(f, src) if not ok: return False , None grp_key.update({f: value}) return True , grp_key def group_distinct_sum( self , src, * unique_fields): assert src and isinstance (src, dict ), src ok, grp_key = self .__cal_grpkey(src) if not ok: return ok, None r_key = grp_key.copy() for u in unique_fields: _ = src.get(u) if _ is None : return False , None r_key.update({u: _}) def __(h, k): self .__r.hset(h, k, 1 ) #self.__r即redis对象 return self .__r.hlen(h) h_key = '_u:{0}:{1}:{2}' . format ( self .__id, ':' .join(unique_fields), ':' .join(( self .__safe_str(v) for v in grp_key.values()))) u_key = hashlib.md5(cPickle.dumps(r_key)).hexdigest() _ = self .__do_aggerate(u_key, __, h_key) return True , (grp_key, _) |
1
|
|
如上代码即完成了上文描述的操作:
* 计算分组字段的值
* 对多个分组字段计算hash
* 对聚集字段(即文章开始的C)作求和操作,调用redis对象的hset和hlen完成求和过程
更详细的,完整的实现过程将在第二部分中阐述
在展开描述之前,先看个简单的例子,假设现有这样一组数据
Index | A | B | C |
0 | a1 | b1 | c1 |
1 | a2 | b1 | c2 |
2 | a1 | b1 | c3 |
需求为这样:
以A,B作为分组字段,对C去重后求和
那么,针对上述的数据源,则结果表为:
A | B | Sum_C |
a1 | b1 | 2 #c1, c3 |
a2 | b1 | 1 #c2 |
计算过程大概为这样:
# 0号数据进入,取出C值c1
#1号数据,A和B的组合与0号不匹配,则生成新的组
#2号进入,与0号匹配,且C值不重复,总和为2
上述的计算过程我们通常会在各种数据库中见到,例如MS SQLServer或者Mongodb等等,在数据库中的计算都有明显的缺陷:
* 单点式
* 无法做实时计算
而且对于mongodb来说(很久没接触关系型数据库了,就不献丑了),数据量的增大以及数据表的增加对于性能是一个非常大的影响,对内存的需求会非常之高,从成本以及性能的角度考虑,我们需要一个可分布式的算法以及实现过程
那么,我们再来回顾刚才的计算过程:
* 对A、B字段的组合分组可以看作一个计算hash的操作
* 对C字段的去重求和也可以看作一个大的hashSet去重的操作
* 对于新的数据进入,重复计算hash的过程
OK,除了计算hash的过程,还应该有存储hash值的设施,很显然,redis最为合适
那么,如何实现呢? ( 以下以python作为实现语言)
我们知道,在python以及js这种语言中,可以很方便的用dict表示一条数据记录,例如:
1
|
{ 'A' : 'a1' , 'B' : 'b1' , 'C' : 'c1' } |
那么,所有的记录操作都是针对dict对象进行的,以下将给出一段代码片段,第二部分将对实现过程做具体的描述
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
def __do_aggerate( self , _2nd_k, op, _1st_k = None ): assert callable (op) _ = self .__aggerates.get(_2nd_k) if _: return _ _1st_k = self .__id if _1st_k is None else _1st_k try : self .__r.watch(_1st_k) _ = op(_1st_k, _2nd_k) except WatchError, e: log.fatal( 'transaction fail: {0}' . format (_1st_k)) finally : self .__r.unwatch() self .__aggerates.update({_2nd_k: _}) return _ def __cal_grpkey( self , src): '''计算分组对应的key ''' grp_key = {} for f in self .__groupby_fields: ok, value = self .sf_parser.unwind(f, src) if not ok: return False , None grp_key.update({f: value}) return True , grp_key def group_distinct_sum( self , src, * unique_fields): assert src and isinstance (src, dict ), src ok, grp_key = self .__cal_grpkey(src) if not ok: return ok, None r_key = grp_key.copy() for u in unique_fields: _ = src.get(u) if _ is None : return False , None r_key.update({u: _}) def __(h, k): self .__r.hset(h, k, 1 ) #self.__r即redis对象 return self .__r.hlen(h) h_key = '_u:{0}:{1}:{2}' . format ( self .__id, ':' .join(unique_fields), ':' .join(( self .__safe_str(v) for v in grp_key.values()))) u_key = hashlib.md5(cPickle.dumps(r_key)).hexdigest() _ = self .__do_aggerate(u_key, __, h_key) return True , (grp_key, _) |
1
|
|
如上代码即完成了上文描述的操作:
* 计算分组字段的值
* 对多个分组字段计算hash
* 对聚集字段(即文章开始的C)作求和操作,调用redis对象的hset和hlen完成求和过程
更详细的,完整的实现过程将在第二部分中阐述