方法一 aggergate:
aggregate是mongo管道方法,使用管道运行百万以上数据需要注意:
- batchsize (如果长时间连接mongo会导致游标丢失错误,这个参数是每次读取数量,可以设置为固顶置,不断返回数据,游标不会丢失)
- allowDiskUse(是允许最大占用最大内存,设置为True,不会报内存不够的错误)
- 上面这两个参数需要用=号键值对的方式设置,不能用:字典的形式
- pipline
- 用bulk_wirte进行去重,bulk的两个参数也需要设置
- 用python的迭代器去运行
4、pipline里的参数;
db.school.aggregate( [ {$match:{time:{$gt:1513612800}}}, {$group:{_id:{insituteName:"$institute",class:"$name"},count:{$sum:1}}}, {$sort:{count:-1}}, {$limit:10}] ) 注: 1. $match 表示查询条件 2. $group 表示分组,_id固定写法, 如果按照多个字段分组,则字段必须都在_id对象中,可以取别名,例如上面示例,输出结果为: { "_id" : { "insituteName" : "数学系",class: "一班"}, "count" : 200 } { "_id" : { "insituteName" : "英语系",class: "二班" }, "count" : 201 } 如果按照一个字段分组,则可以写为{$group:{_id:"$institute",count:{$sum:1}}},此时,查询结果为: { "_id" : "数学系", "count" : 1000 } { "_id" : "英语系", "count" : 800 } 若没有分组字段,则{$group:{_id:null,count:{$sum:1}}} 3. $sort 排序,1——正序,-1——倒序 4. $limit 限制输出数据条数 5. count:{$sum:1},1表示统计查询结果数量,如果想统计time字段和则使用 count:{$sum:"$time"}。 聚合函数有:min,max,sum,avg 6. $project 表示选择显示的字段,1——显示,0——不显示,如下示例: db.school.aggregate([{$project:{insituteName:1,"time:1,_id:0}},{$match:{"time:{$gt:1513612800}}},{$sort:{"time:-1}},{$limit:10}]) 输出: { "insituteName" : "数学系", "time" : 1521392371 } { "name" : "英语系", "time" : 1521392370 } 7. $skip在跳过指定数量的文档,并返回余下的文档,例如,下面跳过前5条文档,显示第6到第10条: db.school.aggregate([{$project:{time:1,name:1,_id:0}},{$match:{time:{$gt:1513612800}}},{$group:{_id:{className:"$name"},count:{$sum:1}}},{$sort:{count:-1}},{$limit:10},{$skip:5}] 注释: $project先筛选出所有文档的time和name两个字段, $match然后筛选出时间大于1513612800的文档, $group按照name字段分组,重命名为className, $sum统计服务条件的文档数量, $sort按照数量count字段倒序排序, $limit限制一共输出前10条文档; $skip跳过前5条文档,输出第6到第10条文档。 参考: https://blog.csdn.net/liuxiaoxiaosmile/article/details/79666391
5和6、python的迭代器,如果数据量过大,bulk_write也会报错,那么需要用python的iter去运行
示例:
map_id = map(lambda doc: doc['dups'][1:], infoall_quali.aggregate( pipeline=pipeline, batchSize=200, allowDiskUse=True )) list_id = [item for sublist in map_id for item in sublist] alist = iter(list_id) anum = 1 count = 1000 len_resultlist = len(list_id) print('Please wait for Copy database {}'.format(len_resultlist)) while True: if (anum - 1) * count >= len_resultlist: break blist = islice(alist, 0, count) result = infoall_quali.bulk_write(list(map(lambda _id: DeleteOne({'_id': _id}), [x for x in blist])), ordered=False, bypass_document_validation=True ).bulk_api_result print(anum,result, datetime.datetime.now()) anum += 1 myclient.close()
方法二、mapreduce:
MongoDB中的MapReduce主要有以下几阶段:
-
Map:把一个操作Map到集合中的每一个文档
-
Shuffle: 根据Key分组对文档,并且为每个不同的Key生成一系列(>=1个)的值表(List of values)。
-
Reduce: 处理值表中的元素,直到值表中只有一个元素。然后将值表返回到Shuffle过程,循环处理,直到每个Key只对应一个值表,并且此值表中只有一个元素,这就是MR的结果。
-
Finalize:此步骤不是必须的。在得到MR最终结果后,再进行一些数据“修剪”性质的处理。