zoukankan      html  css  js  c++  java
  • C#使用MapReduce实现对分片数据的分组

    事由:mongodb已经进行数据分片,这样就不能使用一些方法就不能使用,例如eval,$group如果尝试使用mongodb会提示

    Error: {
        "ok" : 0,
        "errmsg" : "Error: Error: can't use sharded collection from db.eval @:2:9
    ",
        "code" : 2,
        "codeName" : "BadValue"
    } :

    错误原因:分片服务端不支持单服务器实例方法

    经过查找,分片服务器的查询和操作只能使用MapReduce或者Aggregate(聚合管道)操作,这两个mongodb的高级操作可以完成mongo的几乎所有操作。

    需求:查询collection当天满足指定条件的数据,并根据数据进行每个小时的pv,uv,ui统计

    collection中存放浏览器访问记录

    首先我们先使用mongdb的语句把查询语句写出来,这个很关键。如果不是很了解,就去解读一下官方文档吧,一言难尽。

    db.runCommand({
        mapreduce:'visits',   
        query:{ 'sh.si':100650, _id: { $gte: ObjectId('59931a800000000000000000') }},
        map:function(){        
                    emit(                   
                        {hours:this._id.getTimestamp().getHours()},{pv:1,ip:this.ip,ui:this.ui,hours:this._id.getTimestamp().getHours()}        
                        );
                    },
        reduce:function(k,values){
            var data = {
                hours:0,
                pv: 0,
                ip:[],
                ui:[]                        
            };        
            values.forEach(function (v) {              
                data.hours=v.hours;
                data.pv+=v.pv;
                if(v.ui)
                data.ui.push(v.ui);
                 if(v.ip)
                data.ip.push(v.ip);
                });        
            return data;           
            },
            finalize:function(key,v){
                v.hours=NumberInt(v.hours);
                v.pv=NumberInt(v.pv);          
                if(v.ui)
                v.ui=NumberInt(ArrayGroup(v.ui).uni - 1);
                else
                v.ui=0; 
                if(v.ip)
                v.ip=NumberInt(ArrayGroup(v.ip).uni - 1);
                else
                v.ip=0;
                return v;
                },
            out:{inline:1}
        });

    执行之后会得到这样的结果

    /* 1 */
    {
        "results" : [ 
            {
                "_id" : {
                    "hours" : 1.0
                },
                "value" : {
                    "pv" : 1,
                    "ip" : 0,
                    "ui" : 0.0,
                    "hours" : 1
                }
            }, 
            {
                "_id" : {
                    "hours" : 4.0
                },
                "value" : {
                    "pv" : 1,
                    "ip" : 0,
                    "ui" : 0.0,
                    "hours" : 4
                }
            }, 
            {
                "_id" : {
                    "hours" : 5.0
                },
                "value" : {
                    "pv" : 1,
                    "ip" : 0,
                    "ui" : 0.0,
                    "hours" : 5
                }
            }, 
            {
                "_id" : {
                    "hours" : 6.0
                },
                "value" : {
                    "hours" : 6,
                    "pv" : 4,
                    "ip" : 0,
                    "ui" : 0
                }
            }, 
            {
                "_id" : {
                    "hours" : 7.0
                },
                "value" : {
                    "pv" : 1,
                    "ip" : 0,
                    "ui" : 0,
                    "hours" : 7
                }
            }, 
            {
                "_id" : {
                    "hours" : 8.0
                },
                "value" : {
                    "hours" : 8,
                    "pv" : 8,
                    "ip" : 5,
                    "ui" : 4
                }
            }, 
            {
                "_id" : {
                    "hours" : 9.0
                },
                "value" : {
                    "hours" : 9,
                    "pv" : 10,
                    "ip" : 3,
                    "ui" : 0
                }
            }, 
            {
                "_id" : {
                    "hours" : 10.0
                },
                "value" : {
                    "hours" : 10,
                    "pv" : 9,
                    "ip" : 5,
                    "ui" : 2
                }
            }, 
            {
                "_id" : {
                    "hours" : 11.0
                },
                "value" : {
                    "hours" : 11,
                    "pv" : 17,
                    "ip" : 9,
                    "ui" : 1
                }
            }, 
            {
                "_id" : {
                    "hours" : 12.0
                },
                "value" : {
                    "hours" : 12,
                    "pv" : 20,
                    "ip" : 8,
                    "ui" : 2
                }
            }, 
            {
                "_id" : {
                    "hours" : 13.0
                },
                "value" : {
                    "hours" : 13,
                    "pv" : 23,
                    "ip" : 5,
                    "ui" : 2
                }
            }, 
            {
                "_id" : {
                    "hours" : 14.0
                },
                "value" : {
                    "hours" : 14,
                    "pv" : 33,
                    "ip" : 9,
                    "ui" : 2
                }
            }, 
            {
                "_id" : {
                    "hours" : 15.0
                },
                "value" : {
                    "hours" : 15,
                    "pv" : 52,
                    "ip" : 14,
                    "ui" : 4
                }
            }
        ],
        "counts" : {
            "input" : NumberLong(180),
            "emit" : NumberLong(180),
            "reduce" : NumberLong(26),
            "output" : NumberLong(13)
        },
        "timeMillis" : 250,
        "timing" : {
            "shardProcessing" : 152,
            "postProcessing" : 97
        },
        "shardCounts" : {
            "ins-cluster-01/ins-mongodb-01:27018,ins-mongodb-03:27018" : {
                "input" : 90,
                "emit" : 90,
                "reduce" : 9,
                "output" : 12
            },
            "ins-cluster-02/ins-mongodb-02:27019,ins-mongodb-04:27019" : {
                "input" : 90,
                "emit" : 90,
                "reduce" : 8,
                "output" : 10
            }
        },
        "postProcessCounts" : {
            "ins-cluster-01/ins-mongodb-01:27018,ins-mongodb-03:27018" : {
                "input" : NumberLong(22),
                "reduce" : NumberLong(9),
                "output" : NumberLong(13)
            }
        },
        "ok" : 1.0
    }
    results

    只看results返回的数据就可以

    接着就可以借助这里的js脚本移植到C#程序中

      public dynamic Today(int id)
            {
                //var rel = db.DataBase.Eval("show_visited_by_hours(" + id + ")").ToBsonDocument().ToDynamic();
                //return rel;
    
                var query = Query.And(
                            Query.EQ("sh.si", id),
                            Query.GTE("_id", new ObjectId(new DateTime(DateTime.Now.Year, DateTime.Now.Month, DateTime.Now.Day), 0, 0, 0))
                        );//筛选数据
                var map = new BsonJavaScript(@"function () {
                    emit(                   
                        {hours:this._id.getTimestamp().getHours()},{pv:1,ip:this.ip,ui:this.ui,hours:this._id.getTimestamp().getHours()}        
                        );
                     }");//数据按需分组
    
                var reduce = new BsonJavaScript(@"function(k,values){
                var data = {
                    hours:0,
                    pv: 0,
                    ip:[],
                    ui: []                        
                   };
                 values.forEach(function (v) {
                            data.hours=v.hours;           
                            data.pv+=v.pv;
                            data.ui.push(v.ui);
                            data.ip.push(v.ip);
                            });        
                        return data; 
                }
                ");//根据map传递参数做数据操作
                var finalize = new BsonJavaScript(@"function(key,v){
                v.hours=NumberInt(v.hours);
                v.pv=NumberInt(v.pv);
                if(v.ui)
                v.ui=NumberInt(ArrayGroup(v.ui).uni - 1);
                else
                v.ui=NumberInt(0);
                if(v.ip)
                v.ip=NumberInt(ArrayGroup(v.ip).uni - 1);
                else
                v.ip=NumberInt(0);
                return v;
                }");//按照指定格式编排输出数据格式
                MapReduceOptionsBuilder mrob = new MapReduceOptionsBuilder();//定义附加项MapReduceOptions
                mrob.SetFinalize(finalize);
                mrob.SetQuery(query);
                mrob.SetOutput(MapReduceOutput.Inline);//选择直接输出结果
                var rel = db.DataBase.GetCollection("visits").MapReduce(map,reduce, mrob); //提交执行MapReduce
                var valuearray=rel.InlineResults;//获取执行结果集
                var pvcount = 0;
                var ipcount = 0;
                var uicount = 0;
                var count = new List<BsonDocument>();
                foreach (var vitem in valuearray)
                {
                    pvcount+= (int)vitem["value"]["pv"];
                    ipcount += (int)vitem["value"]["ip"];
                    uicount += (int)vitem["value"]["ui"];
                    count.Add(vitem["value"].ToBsonDocument());
                }
                var result = new { pv= pvcount, ip = ipcount, ui = uicount, count = count };//按指定格式组装数据
                return result.ToBsonDocument().ToDynamic();
            }

    过程还需要多去揣摩至于Aggregate就相对很简单了,把所有的查询对象,查询聚合使用BsonDocument做好参数格式化。

    再附上一个Aggregate在C#中的使用方法,相对较容易理解。

    public string func_api_site_count(int sid = 0, int st = 20130226, int et = 20141231, string flag = "year", string order = "pv desc")
            {
                var sitesday = _dbs.DataBase.GetCollection("log_sites_day");
                var match = new QueryDocument();
                match.AddRange(new Dictionary<string, object> {
                    { "_id.day", new Dictionary<string, object> { { "$gte", st }, { "$lte", et } }},
                    { "_id.sid", new Dictionary<string, object> { { "$eq", sid } }}
                });
                var group = new Dictionary<string, object>();
                var sort = new Dictionary<string, BsonValue>();
                var query = new Dictionary<string, object>();
                switch (flag)
                {
                    case "month":
                        query.Add("$substr", new List<object>() { "$_id.day", 0, 6 });
                        break;
                    case "year":
                        query.Add("$substr", new List<object>() { "$_id.day", 0, 4 });
                        break;
                    case "day":
                        query.Add("$_id.day", "$_id.day");
                        break;
                    case "week":
                        break;
                }
    
                group.Add("_id", query);
                group.Add("pv", new BsonDocument("$sum", "$pv"));
                group.Add("uv", new BsonDocument("$sum", "$uv"));
                group.Add("ui", new BsonDocument("$sum", "$ui"));
                group.Add("ip", new BsonDocument("$sum", "$ip"));
                group.Add("nuv", new BsonDocument("$sum", "$nuv"));
                group.Add("area", new BsonDocument("$push", "$area"));
                group.Add("rf", new BsonDocument("$push", "$rfsite"));
                var groupby = new Dictionary<string, BsonValue>{
                     {"_id",null},
                     {"day",new BsonDocument("$push", "$_id")},
                     {"uv",new BsonDocument("$push", "$uv")},
                     {"pv",new BsonDocument("$push", "$pv")},
                     {"ui",new BsonDocument("$push", "$ui")},
                     {"ip",new BsonDocument("$push", "$ip")},
                     {"nuv",new BsonDocument("$push", "$nuv")},
                     {"area",new BsonDocument("$push", "$area")},
                     {"rfsite",new BsonDocument("$push", "$rf")},
                     {"total",new BsonDocument("$sum", 1)},
                     {"totalpv",new BsonDocument("$sum", "$pv")},
                     {"totaluv",new BsonDocument("$sum", "$uv")},
                     {"totalui",new BsonDocument("$sum", "$ui")},
                     };
    
                var project = new Dictionary<string, BsonValue>
                {
                    {"day",1},
                     {"pv",1},
                     {"_id",0},
                     {"total",new BsonDocument() {
                         new BsonElement("row","$total"),
                         new BsonElement("pv","$totalpv"),
                         new BsonElement("uv","$totaluv"),
                         new BsonElement("ui","$totalui"),
                     }},
                     {"area",1},
                      {"uv",1},
                      {"ui",1},
                      {"nuv",1},
                      {"ip",1},
                      {"rfsite",1},
                };
    
                var sitedayOptions = new List<BsonDocument>() {
                     new BsonDocument("$match",BsonDocument.Create(match)),
                     new BsonDocument("$group",BsonDocument.Create(group)),
                     new BsonDocument("$sort",new BsonDocument("_id",1)),
                     new BsonDocument("$group",BsonDocument.Create(groupby)),
                      new BsonDocument("$project",BsonDocument.Create(project)),
                };
    
                var rel = sitesday.Aggregate(sitedayOptions);
                //return rel.ToBsonDocument().ToString();
    
                var result = rel.ResultDocuments.ElementAtOrDefault(0);
                if (rel != null && rel.ResultDocuments.Count() > 0)
                {
                    var rfbson= new BsonDocument() { };
                    result.ToBsonDocument().Add("rf", new BsonDocument() { });
                    foreach (var rfitem in result["rfsite"][0].AsBsonArray)
                    {
                        foreach (var ritem in rfitem.ToBsonDocument())
                        {
                            if (result["rf"].ToBsonDocument().FirstOrDefault(e => e.Name.Equals(ritem.Name)) == null)
                            {
                                rfbson.Add(ritem.Name, ritem.Value.AsInt32);
                            }
                            else
                            {
                                rfbson[ritem.Name] = (rfbson[ritem.Name].AsInt32 + ritem.Value.AsInt32);
                            }
                        }
                    }
                    result["rf"] = rfbson;
                }
                return result.ToBsonDocument().ToString();
            }
  • 相关阅读:
    Git远程操作
    696. Count Binary Substrings
    693. Binary Number with Alternating Bits
    821. Shortest Distance to a Character
    345. Reverse Vowels of a String
    89. Gray Code
    数组操作符重载
    C++字符串反转
    马克思的两面性-来自网友
    C++字符串
  • 原文地址:https://www.cnblogs.com/loyung/p/7375036.html
Copyright © 2011-2022 走看看