map reduce的解释
这是一张来自mongodb-mapreduce图示,比较能说明问题

其实我们可以从word count这个实例来理解MapReduce。MapReduce大体上分为六个步骤:input, split, map, shuffle, reduce, output。细节描述如下:
- 输入(input):如给定一个文档,包含如下四行:
Hello Java
Hello C
Hello Java
Hello C++ - 拆分(split):将上述文档中每一行的内容转换为key-value对,即:
0 - Hello Java
1 - Hello C
2 – Hello Java
3 - Hello C++ - 映射(map):将拆分之后的内容转换成新的key-value对,即:
(Hello , 1)
(Java , 1)
(Hello , 1)
(C , 1)
(Hello , 1)
(Java , 1)
(Hello , 1)
(C++ , 1) - 派发(shuffle):将key相同的扔到一起去,即:
(Hello , 1)
(Hello , 1)
(Hello , 1)
(Hello , 1)
(Java , 1)
(Java , 1)
(C , 1)
(C++ , 1)
注意:这一步需要移动数据,原来的数据可能在不同的datanode上,这一步过后,相同key的数据会被移动到同一台机器上。最终,它会返回一个list包含各种k-value对,即:
{ Hello: 1,1,1,1}
{Java: 1,1}
{C: 1}
{C++: 1} - 缩减(reduce):把同一个key的结果加在一起。如:
(Hello , 4)
(Java , 2)
(C , 1)
(C++,1) - 输出(output): 输出缩减之后的所有结果。
{
"_id" : ObjectId("5a79391534cdbd692825e978"),
"cdn" : "Conversant",
"domain" : "7img1.xxxx.com",
"status_code" : {
"200" : 80,
"206" : 3,
"404" : 2,
"304" : 4
}
}
使用下面语句对status_code各种key进行统计
db.getCollection('log_coll').mapReduce(
function(){
var codes = this.status_code;
Object.keys(codes).forEach(function(k){
emit(k, codes[k]);
})
},
function(k, v){
return Array.sum(v);
},
{
out : {inline : 1},
query: {}
}
)
也可以只显示状态为200的条目
db.getCollection('log_coll').mapReduce(
function(){
var codes = this.status_code;
Object.keys(codes).forEach(function(k){
if(codes[k].id=="200"){
emit(k, codes[k]);
}
})
},
function(k, v){
return Array.sum(v);
},
{
out : {inline : 1},
query: {}
}
)
多级对象如果判断各级对象是否存在
db.getCollection('client_accounts').mapReduce(
function(){
if(this.client!=undefined){
if(this.client.employees!=undefined) {
var codes = this.client.employees;
Object.keys(codes).forEach(function(k){
emit(k, codes[k]);
})
}
}
},
function(k, v){
},
{
out : {inline : 1},
query: {}
}
)
下面看多条件分组的mapreduce实现
我将多个条件拼接在一起方便查看,正式环境时可以使用js对象。
db.customerWorkloadTotal.mapReduce(
function() {
emit(this.salespersonId + "_" + this.customerId, this);
},
function(key, values) {
tagIntention = 0;
signCustomerNums = 0;
trackContactNums = 0;
values.forEach(function(v) {
if (v.tagIntention > 0) {
tagIntention = v.tagIntention;
}
if (v.signCustomerNums > 0) {
signCustomerNums = 1;
}
if (v.trackContactNums > 0) {
trackContactNums = 1;
}
});
return { "tagIntention": tagIntention, "signCustomerNums": signCustomerNums, "trackContactNums": trackContactNums };
},
{
query: { totalDate: { $gte: "2018-10-31" } },
sort: { totalDate: -1 },
out: { inline: 1 }
}
).find()