zoukankan      html  css  js  c++  java
  • MongoDB/聚合/MR

    管道与Aggregation:

    文档结构如下:

    { "_id" : 1, "item" : "abc", "price" : 10, "quantity" : 2, "date" : ISODate("2014-03-01T08:00:00Z") }
    { "_id" : 2, "item" : "jkl", "price" : 20, "quantity" : 1, "date" : ISODate("2014-03-01T09:00:00Z") }
    { "_id" : 3, "item" : "xyz", "price" : 5, "quantity" : 10, "date" : ISODate("2014-03-15T09:00:00Z") }
    { "_id" : 4, "item" : "xyz", "price" : 5, "quantity" : 20, "date" : ISODate("2014-04-04T11:21:39.736Z") }
    { "_id" : 5, "item" : "abc", "price" : 10, "quantity" : 10, "date" : ISODate("2014-04-04T21:23:13.331Z") }
    

    例子:

    db.sales.aggregate([
      # 由上到下,分阶段的进行,注意该数组中的顺序是有意义的
      {
        $project:{item:1,price:1,quantity:1} # 1.取出什么元素待操作;
      },
      {
        $group:{ # 2. 对已取出的元素进行聚合运算;
          _id:"$item", # 根据什么来分组
          quantityCount:{$sum:'$quantity'},
          priceTotal:{$sum:'$price'}
        }
      },
      {
        $sort:{
          quantityCount:1 #3.升序
        }
      },
    
      # 4.基于上面的结果,取倒数第二名
      {
        $skip: 2
      },
      {
        $limit:1
      },
    
      # 5.然后把结果写到result集合中
      {
        $out:'result'
      }
    ])
    
    #表达式$month,$dayOfMonth,$year,$sum,$avg
    db.sales.aggregate(
       [
          {
            $group : {
               _id : { month: { $month: "$date" }, day: { $dayOfMonth: "$date" }, year: { $year: "$date" } }, #按月日年分组
               totalPrice: { $sum: { $multiply: [ "$price", "$quantity" ] } },
               averageQuantity: { $avg: "$quantity" },
               count: { $sum: 1 }
            }
          }
       ]
    )
    
    #结果
    { "_id" : { "month" : 3, "day" : 15, "year" : 2014 }, "totalPrice" : 50, "averageQuantity" : 10, "count" : 1 }
    { "_id" : { "month" : 4, "day" : 4, "year" : 2014 }, "totalPrice" : 200, "averageQuantity" : 15, "count" : 2 }
    { "_id" : { "month" : 3, "day" : 1, "year" : 2014 }, "totalPrice" : 40, "averageQuantity" : 1.5, "count" : 2 }
    
    #
    #
    # 表达式$push
    db.sales.aggregate(
       [
         {
           $group:
             {
               _id: { day: { $dayOfYear: "$date"}, year: { $year: "$date" } },
               itemsSold: { $push:  { item: "$item", quantity: "$quantity" } }
             }
         }
       ]
    )
    
    # result
    {
       "_id" : { "day" : 46, "year" : 2014 },
       "itemsSold" : [
          { "item" : "abc", "quantity" : 10 },
          { "item" : "xyz", "quantity" : 10 },
          { "item" : "xyz", "quantity" : 5 },
          { "item" : "xyz", "quantity" : 10 }
       ]
    }
    {
       "_id" : { "day" : 34, "year" : 2014 },
       "itemsSold" : [
          { "item" : "jkl", "quantity" : 1 },
          { "item" : "xyz", "quantity" : 5 }
       ]
    }
    {
       "_id" : { "day" : 1, "year" : 2014 },
       "itemsSold" : [ { "item" : "abc", "quantity" : 2 } ]
    }
    
    #
    #
    # 表达式$addToSet
    db.sales.aggregate(
       [
         {
           $group:
             {
               _id: { day: { $dayOfYear: "$date"}, year: { $year: "$date" } },
               itemsSold: { $addToSet: "$item" }
             }
         }
       ]
    )
    
    #result
    { "_id" : { "day" : 46, "year" : 2014 }, "itemsSold" : [ "xyz", "abc" ] }
    { "_id" : { "day" : 34, "year" : 2014 }, "itemsSold" : [ "xyz", "jkl" ] }
    { "_id" : { "day" : 1, "year" : 2014 }, "itemsSold" : [ "abc" ] }
    
    #
    #
    # 表达式 $first
    db.sales.aggregate(
       [
         { $sort: { item: 1, date: 1 } },
         {
           $group:
             {
               _id: "$item",
               firstSalesDate: { $first: "$date" }
             }
         }
       ]
    )
    
    # result
    { "_id" : "xyz", "firstSalesDate" : ISODate("2014-02-03T09:05:00Z") }
    { "_id" : "jkl", "firstSalesDate" : ISODate("2014-02-03T09:00:00Z") }
    { "_id" : "abc", "firstSalesDate" : ISODate("2014-01-01T08:00:00Z") }
    

    MapReduce:
    文档结构如下:

    > db.sourceData.findOne()
    {
        "id": 0,
        "name": "Leanne Flinn",
        "email": "leanne.flinn@unilogic.com",
        "work": "Unilogic",
        "dob": "Sun Mar 14 1909 12:45:53 GTM+0530 (LST)",
        "age": 27,
        "gender": "male",
        "salary": 16660,
        "hobbies": "Acrobatics,Photography,Papier-Mache",
        "_id": Object("57579f702fa6c7651e504fe2")
    }
    > db.sourceData.count()
    9999
    

    例子1:计算男女数量
    Mapper 的逻辑
    我们只需要让 Mapper 以性别作为 key,把值作为 1。因为一个用户不是男就是女。所以,Mapper 的输出会是下面这样:
    Key Value
    Male [1,1,1…]
    Female [1,1,1,1,1…]

    Reducer 的逻辑
    在 Reducer 中,我们会获得上面两行数据,我们要做的是把每一行中的值求和,表示该性别的总数。最终的输出结果如下:
    Key Value
    Male 5031
    Female 4968

    mapper = function () {
        emit(this.gender, 1);
    };
     
    reducer = function(gender, count){
        return Array.sum(count);
    };
    在第2行中, this 表示当前的文档,因此 this.gender 会作为 mapper 的 key,它的值要么是 male,要么是 female。而 emit() 将会把数据发送到一个临时保存数据的地方,作为 mapper 的结果。
    在第5行中,我们简单地把每个性别的所有值加起来。
    
    最后,加上执行逻辑:
    
    db.sourceData.mapReduce(
        mapper,
        reducer,
        {
            out : "example1_results"
        }
    );
    db.example1_results.find()
    在第5行中,我们设置了输出的集合名。
    在第9行中,我们会从 example1_results 集合取得结果并显示它。
    

    例子2:获取每个性别中最老和最年轻的人
    Mapper 的逻辑
    在 mapper 中,我们要以性别作为 key,然后以 object 作为 value。这个 object 要包含用户的年龄和名字。年龄是用来做计算用的,而名字只是用来显示给人看的。
    Key Value
    Male [{age: 9, name: ‘John’}, …]
    Female [{age: 19, name: ‘Rita’}, …]

    Reducer 的逻辑
    我们的 reducer 会比前一个例子要复杂一点。我们要检查所有和性别相关的年龄,找到年龄最大和最小的用户。最终的输出结果是这样的:
    Key Value
    Male {min: {name: ‘harry’, age: 1}, max: {name: ‘Alex’, age: 99} }
    Female {min: {name: ‘Loli’, age: 10}, max: {name: ‘Mary’, age: 98} }

    mapper = function () {
        var x = {age : this.age, name : this.name};
        emit(this.gender, {min : x , max : x});
    };
     
    reducer = function(key, values){
        var res = values[0];
        for (var i = 1; i < values.length; i++) {
            if(values[i].min.age < res.min.age)
                res.min = {name : values[i].min.name, age : values[i].min.age};
            if (values[i].max.age > res.max.age) 
               res.max = {name : values[i].max.name, age : values[i].max.age};
        };
        return res;
    };
    
    db.sourceData.mapReduce(
        mapper,
        reducer,
        {
            out : "example2_results"
        }
     );
    
    在第6行,我们构建了一个 object,把它作为 value 发送。
    在第13-18行,我们迭代了所有 object,检查当前的 object 的年龄是否大于或小于前一个 object 的年龄,如果是,就会更新 res.max 或者 res.min。
    

    例子3:计算每种兴趣爱好的人数
    在我们最后的例子中,我们会看看有多少用户有相同的兴趣爱好。
    每个用户的兴趣爱好列表都用逗号分隔。我们会找出有多少用户有表演杂技的爱好等等。

    Mapper 的逻辑
    在这个场景下,我们的 mapper 会复杂一点。我们要为每个用户的兴趣爱好发送一个新的 key-value 对。这样,每个用户的每个兴趣爱好都会触发一次计算。最终我们会得到如下的结果:
    Key Value
    Acrobatics [1,1,1,1,1,1,….]
    Meditation [1,1,1,1,1,1,….]
    Music [1,1,1,1,1,1,….]
    Photography [1,1,1,1,1,1,….]
    Papier-Mache [1,1,1,1,1,1,….]

    Reducer 的逻辑
    在这里,我们只要简单地为每种兴趣爱好求和就好了。最终我们会得到下面的结果:
    Key Value
    Acrobatics 6641
    Meditation 3338
    Music 3338
    Photography 3303
    Papier-Mache 6661

    mapper = function () {
         var hobbys = this.hobbies.split(',');
          for (i in hobbys) {
            emit(hobbys[i], 1);
        }
    };
     
    reducer = function (key, values) {
        var count = 0;
        for (index in values) {
            count += values[index];
        }
     
        return count;
    };
     
     
    db.sourceData.mapReduce(
        mapper,
        reducer,
        {
            out : "example3_results"
        }
     );
    
    注意第7-9行,我们迭代了每个兴趣爱好,然后发送了一次记数。
    第13-18行可以用 Array.sum(values) 来代替,这样是另外一种做相同事情的方式。最终我们得到的结果:
    [ { _id: 'Acrobatics', value: 6641 },
      { _id: 'Meditation', value: 3338 },
      { _id: 'Music', value: 3338 },
      { _id: 'Photography', value: 6661 },
      { _id: 'Papier-Mache', value: 3303 } ]
    

    参考:
    https://scarletsky.github.io/2016/06/12/mapreduce-in-mongodb/
    https://segmentfault.com/a/1190000004263347



    作者:bluebule
    链接:https://www.jianshu.com/p/88c30797b956
    来源:简书
    简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
  • 相关阅读:
    使用linux下的C操作SQLLITE
    s3c6410下移植sqlite3.7.8
    sqlite3在Linux下的安装和使用
    Linux下如何查看哪些进程占用的CPU内存资源最多
    查看LINUX进程内存占用情况
    ssh免密码登陆及其原理
    搭建zookeeper和Kafka集群
    HTTP 错误码
    time 命令
    Shell 运算相关
  • 原文地址:https://www.cnblogs.com/zping/p/11198407.html
Copyright © 2011-2022 走看看