zoukankan      html  css  js  c++  java
  • 用pymongo 删除mongodb中的重复数据(aggregate mapreduce)

    方法一    aggergate:

    aggregate是mongo管道方法,使用管道运行百万以上数据需要注意:

    1. batchsize (如果长时间连接mongo会导致游标丢失错误,这个参数是每次读取数量,可以设置为固顶置,不断返回数据,游标不会丢失)
    2. allowDiskUse(是允许最大占用最大内存,设置为True,不会报内存不够的错误)
    3. 上面这两个参数需要用=号键值对的方式设置,不能用:字典的形式
    4. pipline
    5. 用bulk_wirte进行去重,bulk的两个参数也需要设置
    6. 用python的迭代器去运行

    4、pipline里的参数;

    db.school.aggregate(
    [
    {$match:{time:{$gt:1513612800}}},
    {$group:{_id:{insituteName:"$institute",class:"$name"},count:{$sum:1}}},
    {$sort:{count:-1}},
    {$limit:10}]
    )
    注:
    1. $match 表示查询条件
    2. $group 表示分组,_id固定写法,
       如果按照多个字段分组,则字段必须都在_id对象中,可以取别名,例如上面示例,输出结果为:
       { "_id" : { "insituteName" : "数学系",class: "一班"}, "count" : 200 }
       { "_id" : { "insituteName" : "英语系",class: "二班" }, "count" : 201 } 
       如果按照一个字段分组,则可以写为{$group:{_id:"$institute",count:{$sum:1}}},此时,查询结果为:
       { "_id" : "数学系", "count" : 1000 }
       { "_id" : "英语系", "count" : 800 }
       若没有分组字段,则{$group:{_id:null,count:{$sum:1}}}
    3. $sort 排序,1——正序,-1——倒序
    4. $limit 限制输出数据条数
    5. count:{$sum:1},1表示统计查询结果数量,如果想统计time字段和则使用 count:{$sum:"$time"}。
        聚合函数有:min,max,sum,avg
    6. $project 表示选择显示的字段,1——显示,0——不显示,如下示例:
    db.school.aggregate([{$project:{insituteName:1,"time:1,_id:0}},{$match:{"time:{$gt:1513612800}}},{$sort:{"time:-1}},{$limit:10}])
    输出:
       { "insituteName" : "数学系", "time" : 1521392371 }
       { "name" : "英语系", "time" : 1521392370 }
    7. $skip在跳过指定数量的文档,并返回余下的文档,例如,下面跳过前5条文档,显示第6到第10条:
    
    db.school.aggregate([{$project:{time:1,name:1,_id:0}},{$match:{time:{$gt:1513612800}}},{$group:{_id:{className:"$name"},count:{$sum:1}}},{$sort:{count:-1}},{$limit:10},{$skip:5}]
    
    注释:
    $project先筛选出所有文档的time和name两个字段,
    $match然后筛选出时间大于1513612800的文档,
    $group按照name字段分组,重命名为className,
    $sum统计服务条件的文档数量,
    $sort按照数量count字段倒序排序,
    $limit限制一共输出前10条文档;
    $skip跳过前5条文档,输出第6到第10条文档。
    
    参考:
    https://blog.csdn.net/liuxiaoxiaosmile/article/details/79666391
    

      

    5和6、python的迭代器,如果数据量过大,bulk_write也会报错,那么需要用python的iter去运行

    示例:

    map_id = map(lambda doc: doc['dups'][1:], infoall_quali.aggregate(
                                                                          pipeline=pipeline,
                                                                          batchSize=200,
                                                                          allowDiskUse=True
                                                                          ))
    list_id = [item for sublist in map_id for item in sublist]
    alist = iter(list_id)
    anum = 1
    count = 1000
    len_resultlist = len(list_id)
    print('Please wait for Copy database {}'.format(len_resultlist))
    while True:
        if (anum - 1) * count >= len_resultlist:
            break
        blist = islice(alist, 0, count)
        result = infoall_quali.bulk_write(list(map(lambda _id: DeleteOne({'_id': _id}), [x for x in blist])),
                                              ordered=False,
                                              bypass_document_validation=True
                                              ).bulk_api_result
        print(anum,result, datetime.datetime.now())
        anum += 1
    myclient.close()
    

    方法二、mapreduce:

    MongoDB中的MapReduce主要有以下几阶段:

    • Map:把一个操作Map到集合中的每一个文档

    • Shuffle: 根据Key分组对文档,并且为每个不同的Key生成一系列(>=1个)的值表(List of values)。

    • Reduce: 处理值表中的元素,直到值表中只有一个元素。然后将值表返回到Shuffle过程,循环处理,直到每个Key只对应一个值表,并且此值表中只有一个元素,这就是MR的结果。

    • Finalize:此步骤不是必须的。在得到MR最终结果后,再进行一些数据“修剪”性质的处理。

  • 相关阅读:
    Java实现 LeetCode 530 二叉搜索树的最小绝对差(遍历树)
    Java实现 LeetCode 530 二叉搜索树的最小绝对差(遍历树)
    Java实现 LeetCode 530 二叉搜索树的最小绝对差(遍历树)
    Java实现 LeetCode 529 扫雷游戏(DFS)
    Java实现 LeetCode 529 扫雷游戏(DFS)
    Java实现 LeetCode 529 扫雷游戏(DFS)
    嵌入式/X86下linux系统死机及内存优化
    gcc指定头文件路径及动态链接库路径
    嵌入式 hi3518c裸板uboot烧写、kernel烧写、fs烧写小结
    pthread_attr_init线程属性
  • 原文地址:https://www.cnblogs.com/Robertzewen/p/10265636.html
Copyright © 2011-2022 走看看