zoukankan      html  css  js  c++  java
  • mongo批量插入问题(insert_many,bulk_write),spark df转json传入mongo

    https://blog.csdn.net/nihaoxiaocui/article/details/95060906

    https://xuexiyuan.cn/article/detail/173.html

    from etlsdk.lib.datasources.datasource_factory import DatasourceFactory
    from data_pipeline.df_transform.transform import DataframeTransform
    from data_pipeline.utils.utils import get_extractor_cls
    import json
    import math
    from pymongo import MongoClient, ReplaceOne, InsertOne
    
    
    class CommonMongodbPlugin(object):
        """
        提供dataframe 转化为新的dataframe 并写入mongodb数据库,
        """
    
        def write2mongo(self, iterator):
          # x.asDict() 将每一行转化为字典类型 result_list
    = [] for x in iterator: if self.id: result_list.append(ReplaceOne({'_id': x.asDict()[self.id]}, x.asDict(), upsert=True)) else: result_list.append(InsertOne(x.asDict())) if len(result_list) > 0: client = MongoClient(self.mongo_url) collection = client.get_database(self.mongo_db)[self.mongo_collection] collection.bulk_write(result_list, ordered=False, bypass_document_validation=True) def run(self, inputs, outputs, args): """ 根据tdate删除es中的数据 python3 -m etlsdk.main data_pipeline.plugins.bluebook.common_parsed2mongodb.CommonMongodbPlugin.run --input input_table:name=OSS_default:amazoncrawl:Jingji21/ --partition "2019-11-07 10:00:00" --args keys:'["item_id","article_id","content","title","site","news_url","summary","create_time"]' --args mongo:'{"mongo_url": "mongodb://用户名:密码@ip:端口号/连接库名字","mongo_db":"库名","mongo_collection":"表名"}' --args extractor:data_pipeline.extractors.blue_book.jingji21.Jingji21Extractor --args _id:"item_id" """ input_df = inputs['input_table']['df'] # 获取Extractor "data_pipeline.extractors.blue_book.jingji21.Jingji21Extractor" ExtractorCls = get_extractor_cls(args["extractor"]) # get_extractor_cls() 获取Extractor处理类 keys = args["keys"] if isinstance(args["keys"], list) else json.loads(args["keys"]) # 列名输出的df的字段列表 self.id = args.get("_id") class Extractor(ExtractorCls): # 继承ExtractorCls namespace = args.get("namespace", 'production') columns_selected = None df = DataframeTransform.struct2struct(input_df, Extractor, keys, columns_selected) mymongo = args["mongo"] if isinstance(args["mongo"], dict) else json.loads(args["mongo"]) self.mongo_url = mymongo["mongo_url"] self.mongo_db = mymongo["mongo_db"] self.mongo_collection = mymongo["mongo_collection"] df_count = df.count() partition_number = math.ceil(df_count / 500) if partition_number != 1: df = df.repartition(partition_number) df.foreachPartition(self.write2mongo) # 遍历 partition
  • 相关阅读:
    Oracle 12c中文乱码,修改字符集的方法
    C++设计模式——简单工厂模式与策略模式比较
    C++设计模式——工厂模式Factory Method
    JavaWeb——Servlet基础
    C++设计模式——装饰模式Bridge-Pattern
    线性代数思维导图(2)——矩阵
    线性代数思维导图(1)——行列式
    C++设计模式——适配器模式Bridge-Pattern
    C++设计模式——桥接模式Bridge-Pattern
    不想写博客?那试试日记吧!
  • 原文地址:https://www.cnblogs.com/wang102030/p/11839687.html
Copyright © 2011-2022 走看看