自从MongoDB被越来越多的大型关键项目采用后,数据分析也成为了越来越重要的话题。人们似乎已经厌倦了使用不同的软件来进行分析(这都利用到了Hadoop),因为这些方法往往需要大规模的数据传输,而这些成本相当昂贵。
MongoDB提供了2种方式来对数据进行分析:Map Reduce(以下简称MR)和聚合框架(Aggregation Framework)。MR非常灵活且易于使用,它可以很好地与分片(sharding)结合使用,并允许大规模输出。尽管在MongoDB v2.4版本中,由于JavaScript引擎从Spider切换到了V8,使得MR的性能有了大幅改进,但是与Agg Framework(使用C++)相比,MR的速度还是显得比较慢。本文就来看看,有哪些方法可以让MR的速度有所提升。
测试
首先我们来做个测试,插入1000万文档,这些文档中包含了介于0和100万之间的单一整数值,这意味着,平均每10个文档具有相同的值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
> for ( var i
= 0; i < 10000000; ++i){ db.uniques.insert({ dim0: Math.floor(Math.random()*1000000) });} > db.uniques.findOne() { "_id" :
ObjectId( "51d3c386acd412e22c188dec" ), "dim0" :
570859 } > db.uniques.ensureIndex({dim0: 1}) > db.uniques.stats() { "ns" : "test.uniques" , "count" :
10000000, "size" :
360000052, "avgObjSize" :
36.0000052, "storageSize" :
582864896, "numExtents" :
18, "nindexes" :
2, "lastExtentSize" :
153874432, "paddingFactor" :
1, "systemFlags" :
1, "userFlags" :
0, "totalIndexSize" :
576040080, "indexSizes" :
{ "_id_" :
324456384, "dim0_1" :
251583696 }, "ok" :
1 } |
这里我们想要得到文档中唯一值的计数,可以通过下面的MR任务来轻松完成:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
> db.runCommand( { mapreduce: "uniques" , map: function ()
{ emit( this .dim0,
1); }, reduce: function (key,
values) { return Array.sum(values);
}, out: "mrout" }) { "result" : "mrout" , "timeMillis" :
1161960, "counts" :
{ "input" :
10000000, "emit" :
10000000, "reduce" :
1059138, "output" :
999961 }, "ok" :
1 } |
正如你看到的,输出结果大约需要1200秒(在EC2 M3实例上测试),共输出了1千万maps、100万reduces、999961个文档。结果类似于:
1
2
3
4
5
6
7
8
9
10
11
12
|
> db.mrout.find() { "_id" :
1, "value" :
10 } { "_id" :
2, "value" :
5 } { "_id" :
3, "value" :
6 } { "_id" :
4, "value" :
10 } { "_id" :
5, "value" :
9 } { "_id" :
6, "value" :
12 } { "_id" :
7, "value" :
5 } { "_id" :
8, "value" :
16 } { "_id" :
9, "value" :
10 } { "_id" :
10, "value" :
13 } ... |
下面就来看看如何进行优化。
使用排序
我在之前的这篇文章中简要说明了使用排序对于MR的好处,这是一个鲜为人知的特性。在这种情况下,如果处理未排序的输入,意味着MR引擎将得到随机排序的值,
基本上没有机会在RAM中进行reduce,相反,它将不得不通过一个临时collection来将数据写回磁盘,然后按顺序读取并进行reduce。
下面来看看如果使用排序,会有什么帮助:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
> db.runCommand( { mapreduce: "uniques" , map: function ()
{ emit( this .dim0,
1); }, reduce: function (key,
values) { return Array.sum(values);
}, out: "mrout" , sort: {dim0: 1} }) { "result" : "mrout" , "timeMillis" :
192589, "counts" :
{ "input" :
10000000, "emit" :
10000000, "reduce" :
1000372, "output" :
999961 }, "ok" :
1 } |
现在时间降到了192秒,速度提升了6倍。其实reduces的数量是差不多的,但是它们在被写入磁盘之前已经在RAM中完成了。
使用多线程
在MongoDB中,一个单一的MR任务并不能使用多线程——只有在多个任务中才能使用多线程。但是目前的多核CPU非常有利于在单一服务器上进行并行化工作,就像Hadoop。我们需要做的是,将输入数据分割成若干块,并为每个块分配一个MR任务。splitVector命令可以帮助你非常迅速地找到分割点,如果你有更简单的分割方法更好。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
> db.runCommand({splitVector: "test.uniques" ,
keyPattern: {dim0: 1}, maxChunkSizeBytes: 32000000}) { "timeMillis" :
6006, "splitKeys" :
[ { "dim0" :
18171 }, { "dim0" :
36378 }, { "dim0" :
54528 }, { "dim0" :
72717 }, … { "dim0" :
963598 }, { "dim0" :
981805 } ], "ok" :
1 } |
从1千万文档中找出分割点,使用splitVector命令只需要大约5秒,这已经相当快了。所以,下面我们需要做的是找到一种方式来创建多个MR任务。从应用服务器方面来说,使用多线程和$gt / $lt查询命令会非常方便。从shell方面来说,可以使用ScopedThread对象,它的工作原理如下:
1
2
3
|
> var t
= new ScopedThread(mapred,
963598, 981805) > t.start() > t.join() |
现在我们可以放入一些JS代码,这些代码可以产生4个线程,下面来等待结果显示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
> var res
= db.runCommand({splitVector: "test.uniques" ,
keyPattern: {dim0: 1}, maxChunkSizeBytes: 32 *1024 * 1024 }) > var keys
= res.splitKeys > keys.length 39 > var mapred
= function (min,
max) { return db.runCommand({
mapreduce: "uniques" , map: function ()
{ emit( this .dim0,
1); }, reduce: function (key,
values) { return Array.sum(values);
}, out: "mrout" +
min, sort: {dim0: 1}, query: { dim0: { $gte: min, $lt: max } } }) } > var numThreads
= 4 > var inc
= Math.floor(keys.length / numThreads) + 1 > threads = []; for ( var i
= 0; i < numThreads; ++i) { var min
= (i == 0) ? 0 : keys[i * inc].dim0; var max
= (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print( "min:" +
min + " max:" +
max); var t
= new ScopedThread(mapred,
min, max); threads.push(t); t.start() } min:0 max:274736 min:274736 max:524997 min:524997 max:775025 min:775025 max:{ "$maxKey" :
1 } connecting to: test connecting to: test connecting to: test connecting to: test > for ( var i in threads)
{ var t
= threads[i]; t.join(); printjson(t.returnData()); } { "result" : "mrout0" , "timeMillis" :
205790, "counts" :
{ "input" :
2750002, "emit" :
2750002, "reduce" :
274828, "output" :
274723 }, "ok" :
1 } { "result" : "mrout274736" , "timeMillis" :
189868, "counts" :
{ "input" :
2500013, "emit" :
2500013, "reduce" :
250364, "output" :
250255 }, "ok" :
1 } { "result" : "mrout524997" , "timeMillis" :
191449, "counts" :
{ "input" :
2500014, "emit" :
2500014, "reduce" :
250120, "output" :
250019 }, "ok" :
1 } { "result" : "mrout775025" , "timeMillis" :
184945, "counts" :
{ "input" :
2249971, "emit" :
2249971, "reduce" :
225057, "output" :
224964 }, "ok" :
1 } |
第1个线程所做的工作比其他的要多一点,但时间仍达到了190秒,这意味着多线程并没有比单线程快!
使用多个数据库
这里的问题是,线程之间存在太多锁争用。当锁时,MR不是非常无私(每1000次读取会进行yield)。由于MR任务做了大量写操作,线程之间结束时会等待彼此。由于MongoDB的每个数据库都有独立的锁,那么让我们来尝试为每个线程使用不同的输出数据库:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
> var mapred
= function (min,
max) { return db.runCommand({
mapreduce: "uniques" , map: function ()
{ emit( this .dim0,
1); }, reduce: function (key,
values) { return Array.sum(values);
}, out: { replace: "mrout" +
min, db: "mrdb" +
min }, sort: {dim0: 1}, query: { dim0: { $gte: min, $lt: max } } }) } > threads = []; for ( var i
= 0; i < numThreads; ++i) { var min
= (i == 0) ? 0 : keys[i * inc].dim0; var max
= (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print( "min:" +
min + " max:" +
max); var t
= new ScopedThread(mapred,
min, max); threads.push(t); t.start() } min:0 max:274736 min:274736 max:524997 min:524997 max:775025 min:775025 max:{ "$maxKey" :
1 } connecting to: test connecting to: test connecting to: test connecting to: test > for ( var i in threads)
{ var t
= threads[i]; t.join(); printjson(t.returnData()); } ... { "result" :
{ "db" : "mrdb274736" , "collection" : "mrout274736" }, "timeMillis" :
105821, "counts" :
{ "input" :
2500013, "emit" :
2500013, "reduce" :
250364, "output" :
250255 }, "ok" :
1 } ... |
所需时间减少到了100秒,这意味着与一个单独的线程相比,速度约提高2倍。尽管不如预期,但已经很不错了。在这里,我使用了4个核心,只提升了2倍,如果使用8核CPU,大约会提升4倍。
使用纯JavaScript模式
在线程之间分割输入数据时,有一些非常有趣的东西:每个线程只拥有约25万主键来输出,而不是100万。这意味着我们可以使用“纯JS模式”——通过jsMode:true来启用。开启后,MongoDB不会在JS和BSON之间反复转换,相反,它会从内部的一个50万主键的JS字典来reduces所有对象。下面来看看该操作是否对速度提升有帮助。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
> var mapred
= function (min,
max) { return db.runCommand({
mapreduce: "uniques" , map: function ()
{ emit( this .dim0,
1); }, reduce: function (key,
values) { return Array.sum(values);
}, out: { replace: "mrout" +
min, db: "mrdb" +
min }, sort: {dim0: 1}, query: { dim0: { $gte: min, $lt: max } }, jsMode: true })
} > threads = []; for ( var i
= 0; i < numThreads; ++i) { var min
= (i == 0) ? 0 : keys[i * inc].dim0; var max
= (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print( "min:" +
min + " max:" +
max); var t
= new ScopedThread(mapred,
min, max); threads.push(t); t.start() } min:0 max:274736 min:274736 max:524997 min:524997 max:775025 min:775025 max:{ "$maxKey" :
1 } connecting to: test connecting to: test connecting to: test connecting to: test > for ( var i in threads)
{ var t
= threads[i]; t.join(); printjson(t.returnData()); } ... { "result" :
{ "db" : "mrdb274736" , "collection" : "mrout274736" }, "timeMillis" :
70507, "counts" :
{ "input" :
2500013, "emit" :
2500013, "reduce" :
250156, "output" :
250255 }, "ok" :
1 } ... |
现在时间降低到70秒。看来jsMode确实有帮助,尤其是当对象有很多字段时。该示例中是一个单一的数字字段,不过仍然提升了30%。
MongoDB v2.6版本中的改进
在MongoDB v2.6版本的开发中,移除了一段关于在JS函数调用时的一个可选“args”参数的代码。该参数是不标准的,也不建议使用,它由于历史原因遗留了下来(见SERVER-4654)。让我们从Git库中pull最新的MongoDB并编译,然后再次运行测试用例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
... { "result" :
{ "db" : "mrdb274736" , "collection" : "mrout274736" }, "timeMillis" :
62785, "counts" :
{ "input" :
2500013, "emit" :
2500013, "reduce" :
250156, "output" :
250255 }, "ok" :
1 } ... |
从结果来看,时间降低到了60秒,速度大约提升了10-15%。同时,这种更改也改善了JS引擎的整体堆消耗量。
结论
回头来看,对于同样的MR任务,与最开始时的1200秒相比,速度已经提升了20倍。这种优化应该适用于大多数情况,即使一些技巧效果不那么理想(比如使用多个输出dbs /集合)。但是这些技巧可以帮助人们来提升MR任务的速度,未来这些特性也许会更加易用——比如,这个ticket 将会使splitVector命令更加可用,这个ticket将会改进同一数据库中的多个MR任务。