zoukankan      html  css  js  c++  java
  • MongoDB 聚合操作(转)

    在MongoDB中,有两种方式计算聚合:Pipeline 和 MapReduce。Pipeline查询速度快于MapReduce,但是MapReduce的强大之处在于能够在多台Server上并行执行复杂的聚合逻辑。MongoDB不允许Pipeline的单个聚合操作占用过多的系统内存,如果一个聚合操作消耗20%以上的内存,那么MongoDB直接停止操作,并向客户端输出错误消息。

    一,使用 Pipeline 方式计算聚合

    Pipeline 方式使用db.collection.aggregate()函数进行聚合运算,运算速度较快,操作简单,但是,Pipeline方式有两个限制:单个聚合操作消耗的内存不能超过20%,聚合操作返回的结果集必须限制在16MB以内。

    创建示例数据,在集合 foo中插入1000条doc,每个doc中有三个field:idx,name 和 age。

    for(i=0;i<10000;i++)
    { 
      db.foo.insert({"idx":i,name:"user "+i,age:i%90});
    }

    1,使用$match 管道符过滤collection中doc,使符合条件的doc进入pipeline,能够减少聚合操作消耗的内存,提高聚合的效率。

    db.foo.aggregate({$match:{age:{$lte:25}}})

    2,使用$project 管道符,使用doc中的部分field进入下级pipeline

    db.foo.aggregate(
    {$match:{age:{$lte:25}}}, 
    {$project:{age:1,idx:1,"_id":0}} 
    )

    $project 管道符的作用是选择字段,重命名字段,派生字段。 

    2.1 选择字段

    在$project 管道符中,field:1/0,表示选择/不选择 field;将无用的字段从pipeline中过滤掉,能够减少聚合操作对内存的消耗。

    db.foo.aggregate(
    {$match:{age:{$lte:25}}}, 
    {$project:{age:1,idx:1,"_id":0}} 
    )

    2.2 对字段重命名,产生新的字段

    引用符$,格式是:"$field",表示引用doc中 field 的值,如果要引用内嵌 doc中的字段,使用 "$field1.filed2",表示引用内嵌文档field1中的字段:field2的值。

    示例,新建一个field:preIdx,其值和idx 字段的值是相同的。

    db.foo.aggregate(
    {$match:{age:{$lte:25}}}, 
    {$project:{age:1,"preIdx":"$idx",idx:1,"_id":0}} 
    )

    2.3 派生字段

    在$project中,对字段进行计算,根据doc中的字段值和表达式,派生一个新的字段。

    示例,preIdx是根据当前doc的idx 减1 得到的

    db.foo.aggregate(
    {$match:{age:{$lte:25}}}, 
    {$project:
         {
        age:1,
        "preIdx":{$subtract:["$idx",1]},
        idx:1,
        "_id":0}
         } 
    )
     

    在$project 执行算术运算的操作符:+($add),*($multiply),/($divide),%($mod),-($subtract)。

    对于字符数据,$substr:[expr,start,length]用于求子字符串;$concat:[expr1,expr2,,,exprn],用于将表达式连接在一起;$toLower:expr 和 $toUpper:expr用于返回expr的小写或大写形式。

    2.4 分组操作

    使用$group将doc按照特定的字段的值进行分组,$group将分组字段的值相同的doc作为一个分组进行聚合计算。如果没有$group 管道符,那么所有doc作为一个分组。对每一个分组,都能根据业务逻辑需要计算特定的聚合值。分组操作和排序操作都是非流式的运算符,流式运算符是指:只要有新doc进入,就可以对doc进行处理,而非流式运算符是指:必须等收到所有的文档之后,才能对文档进行处理。分组运算符的处理方式是等接收到所有的doc之后,才能对doc进行分组,然后将各个分组发送给pipeline的下一个运算符进行处理。

    示例,按照age进行分组,统计每个分组中的doc数量

    db.foo.aggregate(
    {$match:{age:{$lte:25}}}, 
    {$project:{age:1,"preIdx":{$subtract:["$idx",1]},idx:1,"_id":0}} ,
    {$group:{"_id":"$age",count:{$sum:1}}}
    )

    如果分组字段有多个,按照 age 和 age2 进行分组,这样做仅仅是为了演示,在实际的产品环境中,可以使用更多的字段用来分组。

    db.foo.aggregate(
    {$match:{age:{$lte:25}}}, 
    {$project:{age:1,"preIdx":{$subtract:["$idx",1]},idx:1,"_id":0}} ,
    {$group:{"_id":{age:"$age",age2:"$age"},count:{$sum:1}}}
    )

    对每个分组进行聚合运算,count字段是计算每个分组中doc的数量,idxTotal字段是计算每个分组中idx字段值的加和,idxMax字段是计算每个分组中idx字段值的最大值,idxFirst是计算每个分组中第一个idx 字段的值,不一定是最小的。

     
    db.foo.aggregate(
    {$match:{age:{$lte:25}}}, 
    {$project:{age:1,"preIdx":{$subtract:["$idx",1]},idx:1,"_id":0}} ,
    {$group:
       {
        "_id":{age:"$age",age2:"$age"},
        count:{$sum:1},
        idxTotal:{$sum:"$idx"}},
        idxMax:{$max:"$idx"},
        idxFirst:{$first:"$idx"}
       }
    } )
     

    2.5,sort操作,limit操作 和 skip操作
    对聚合操作的结果进行排序,然后跳过前10个doc,取剩余结果集的前10个doc。

     
    db.foo.aggregate(
    {$match:{age:{$lte:25}}}, 
    {$project:{age:1,"preIdx":{$subtract:["$idx",1]},idx:1,"_id":0}} ,
    {$group:
       {
        "_id":{age:"$age",age2:"$age"},
        count:{$sum:1},
        idxTotal:{$sum:"$idx"}},
        idxMax:{$max:"$idx"},
        idxFirst:{$first:"$idx"}
       }
    },
    {$sort:{age:-1}},
    {$skip:10},
    {$limit:10}
    )
     

    二,使用MapReduce 方式计算聚合
    MapReduce 能够计算非常复杂的聚合逻辑,非常灵活,但是,MapReduce非常慢,不应该用于实时的数据分析中。MapReduce能够在多台Server上并行执行,每台Server只负责完成一部分wordload,最后将wordload发送到Master Server上合并,计算出最终的结果集,返回客户端。

    MapReduce分为两个阶段:Map和Reduce,举个例子说明,有10节车厢,统计这10节车厢中男生和女生的数量。串行方式一节一节车厢的统计,直到统计完全部车厢中的人数:男50人,女40人。

    使用MapReduce方式的思路是:每个车厢派一个人去统计,每个人返回一个doc,例如,keyN:{female:num1,male:num2},keyN是车厢编号,在同一时间,有10个人在同时工作,每个人只完成全部workload的10%,很快,返回10个doc,从Key1到Key10,只需要将这10个doc中 femal 和 male分别加和到一起,就是全部车厢的人数:男50人,女40人。

     

    使用MapReduce方式计算聚合,主要分为三步:Map,Shuffle(拼凑)和Reduce,Map和Reduce需要显式定义,shuffle由MongoDB来实现。

    • Map:将操作映射到每个doc,产生Key和Value,例如,Map一个doc,产生(female,{count:1}),female是Key,value是{count:1}
    • Shuffle:按照Key进行分组,并将key相同的Value组合成数组,例如,产生(female,[{count:1},{count:1},{count:1},{count:1},,,,,])
    • Reduce:把Value数组化简为单值,例如,产生(femal,{count:21})

    使用MapReduce进行聚合运算的最佳方式是聚合运算的结果能够加到一起,例如,求最大值/最小值,sum,平均值(转换为计算每台Server的 总和sum1,sum2,,,sumN 与 num1,num2,,numN,平均值avg=(sum1+sum2+,,,+sumN)/(num1+num2+,,+numN))等。

    示例,使用MapReduce模拟Count,统计集合中的doc的数量

    step1,定义Map函数和reduce函数

    对于每个doc,直接返回key 和 一个doc:{count:1}

     
    map=function (){
    for(var key in this)
    {
      emit(key,{count:1});
    }
    }
    
    reduce=function (key,emits){
    total=0;
    for(var i in emits){
      total+=emits[i].count;
    }
    return {"count":total};
    }
     

    step2,执行MapReduce运算
    在集合 foo上执行MapReduce运算,返回mr 对象

     
    mr=db.runCommand(
    {
    "mapreduce":"foo",
    "map":map,
    "reduce":reduce,
    out:"Count Doc"
    })
     

    step3,查看MapReduce计算的结果

    db[mr.result].find()

    示例2,统计集合foo中不同age的数量

    step1,定义Map 和 Reduce函数

    Map函数的作用是对每个doc进行一次映射,返回age 和 {count:1};

    经过Shuffle,每个age都有一个列表:[{count:1},{count:1},{count:1},{count:1},,,,,],有多少个不同的age,MongoDB都会调用多少次Reduce函数,每次调用时,Key值是不同的。

    Reduce函数的作用:对MongoDB的一次调用,对age对应的列表进行聚合运算。

     
    map=function ()
    {
    emit(this.age,{count:1});
    }
    
    reduce= function (key,emits)
    {
    total=0;
    for(var i in emits)
    {
       total+=emits[i].count;
    }
    
    return {"age":key,count:total};
    }
     

    step2,执行MapReduce聚合运算

     
    mr=db.runCommand(
    {
    "mapreduce":"foo",
    "map":map,
    "reduce":reduce,
    out:"Count Doc"
    })
     

    step3,查看聚合运算的结果

    db[mr.result].find()

    示例3,研究reduce函数的特性

    reduce函数具有累加的特性,通过多次调用,能够产生最终的累加值,例如,以下reduce函数对于任意一个特定的key,reduce都能计算key的数量

    reduce= function (key,emits)
    {
    total=0;
    for(var i in emits)
    {
       total+=emits[i].count;
    }
    
    return {"key":key,count:total};
    }

    调用示例:传递的Key是相同的,都是“x”,每个emits都是一个数组,反复调用reduce函数,最终获得key的累加值。

    r1=reduce("x",[{count:1},{count:2}])
    r2=reduce("x",[{count:3},{count:5}])
    r3=reduce("x",[r1,r2])

  • 相关阅读:
    POJ 1795 DNA Laboratory
    CodeForces 303B Rectangle Puzzle II
    HDU 2197 本源串
    HDU 5965 扫雷
    POJ 3099 Go Go Gorelians
    CodeForces 762D Maximum path
    CodeForces 731C Socks
    HDU 1231 最大连续子序列
    HDU 5650 so easy
    大话接口隐私与安全 转载
  • 原文地址:https://www.cnblogs.com/sandea/p/10479155.html
Copyright © 2011-2022 走看看