zoukankan      html  css  js  c++  java
  • Mongo批量同步数据到SqlServer

    基于业务需求,由于老版本的数据之前是存储在SqlServer中且字段经过改版都有很大的差距,需要同步mongo中的数据到SqlServer中做数据联合计算处理
    1. 根据需要的数据先写Query再进行聚合及映射处理,要先知道SqlServer中的数据字段和Mongo中字段对应关系

    2.批量写入数据到SqlServer

    Code:

    # #!/usr/bin/python3
    # -*- coding: utf-8 -*-
    # @Time : 2021-09-16 11:58
    # @Author : BruceLong
    # @FileName: yiche_mongo_to_sqlserver.py
    # @Email   : 18656170559@163.com
    # @Software: PyCharm
    # @Blog :http://www.cnblogs.com/yunlongaimeng/
    import pymssql
    import time
    
    import pymongo
    
    
    class MongoToSqlServer:
        def __init__(self):
            self.cd_mongo = pymongo.MongoClient(
                "mongodb://localhost:27017/")
            self.coll = self.cd_mongo['ClientIds']['C102101']
            self.conn = pymssql.connect('127.0.0.1', 'admin', 'admin', 'DC_BitAuto_BBS')
            self.cur = self.conn.cursor()
            self.count = 0
            self.limit = 1000
            self.table = 'bitautoBBS_newByMonth'
            pass
    
        def selece_mongo(self):
            # ctime = time.strftime('%Y-%m-%d', time.localtime(time.time() - 60 * 60 * 24))
            ctime = '2021-09-01'
            ctime_sql = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
            while True:
                pipeline = [
                    {
                        "$match":
                            {
                                "realChannel": "yiche",
                                "CreateTime": {
                                    "$gte": ctime
                                }
                            }
                    }
                    , {
                        "$project": {
                            "_id": 0,
                            "ID": "$pid",
                            "NoOfRead": "$NumberofRead",
                            "NoOfReply": "$NumberofReplys",
                            "UserName": "$Author",
                            "UserInfo": None,
                            "ArticleTitle": "$articleTitle",
                            "ArticleContent": "$reply",
                            "ArticleTime": "$replyDate",
                            "Thread_ID": "$ThreadId",
                            "Is_Top": None,
                            "Forum": None,
                            "fname": "$ForumName",
                            "url": "$URL",
                            "CreateTime": ctime_sql
                        }
                    }, {"$skip": self.count}, {"$limit": self.limit}]
                datas = self.coll.aggregate(pipeline=pipeline)
                datas = [tuple(i.values()) for i in datas]
                if not datas:
                    return
                self.count += self.limit
                # print(datas)
                sql = "INSERT INTO [dbo].[{table}] VALUES ({values})".format(table=self.table,
                                                                             values=','.join(
                                                                                 [" %s" for i in range(len(datas[0]))]))
                # print(sql)
                self.cur.executemany(sql, datas)
                self.conn.commit()
                print(self.count, self.limit, len(datas))
            pass
    
        def run(self):
            self.selece_mongo()
    
    
    if __name__ == '__main__':
        mongo = MongoToSqlServer()
        mongo.run()
  • 相关阅读:
    Java-DatabaseConnectionPool工具类
    Java-收邮件
    Java-发邮件
    Java-ZipUtil工具类
    Java-WebServiceUtil工具类
    Java-FtpUtil工具类
    Java-Base64Fiend工具类
    Java-ConnectDB工具类
    Java-UploadHelper工具类
    Java-生成缩略图工具类
  • 原文地址:https://www.cnblogs.com/yunlongaimeng/p/15343859.html
Copyright © 2011-2022 走看看