zoukankan      html  css  js  c++  java
  • mongodb数据迁移到新库并同时写入ES

    工作中遇到一个需求,要将旧系统的mongodb数据库全部迁移至新的系统中。新旧系统的数据结构不一致,旧系统设计的是两张表,新系统是一张。字段也发生了变化。

    1、实现方案

    连接mongodb数据库,逐条读取数据,并重新组装。最后写入新库和ES。

    程序实现并不复杂,但有几个注意的地方,记录一下。本文没有详细讲述具体的模块使用方法,如果需要就自行百度下。这种工具程序的业务定制化程度很高,无法完成通用的任务,仅供参考。

    完整的程序到github下载。下载地址

    2、结构

    conf 配置文件目录
    logs 日志文件目录
    util 工具目录
    data_migrate.py  入库文件main
    mongo_connect.py mongodb连接类
    write_es.py   ES连接类
    

    3、使用

    修改配置文件后,就可以使用了。里边的数据结构转换的代码按照具体的业务逻辑转换。
    全局配置文件

    填写数据库配置信息

    #读取数据库配置
    [MongodbR]
    host =
    port =
    user =
    password =
    db =  数据库1
    db2 =
    coll =  集合1
    coll2 =
    #写入数据库配置
    [MongodbW]
    host =
    port =
    user =
    password =
    db =
    coll =
    

    日志配置文件

    需要修改日志文件路径,报警级别

    [handler_fileHandler]
    class=logging.handlers.RotatingFileHandler
    level=DEBUG
    formatter=fmt
    args=('./logs/logdemo.log','a',20000,5,'utf-8')
    

    4、代码解读

    mongodb

    mongodb连接超时问题
    通常我们查询的使用res=coondocinfo.find() 一般来说没有问题,此时出来的只是游标,而不是数据结果集。所以在处理过程中需要长时间保持数据库的连接,
    但默认的mongodb数据库连接只有十分钟,如果读取大量数据的时候十分钟显然不够用,这项目一年的数据量有上亿条。所以需要使用:
    with coondocinfo.find({"ch": 1,'it': {"$gte": stime, "$lt": etime}},no_cursor_timeout=True) as cursor 这种方式保持mongodb连接,否则会报错

    with coondocinfo.find({"ch": 1,'it': {"$gte": stime, "$lt": etime}},no_cursor_timeout=True) as cursor:
      for i in cursor:
        logging.info("正在处理的_id是%s" % i['_id'])
        #读取doctext库,获取正文数据,已处理成字符串
        try:
          content = list(coondoctext.find({'_id' : i['_id']},{'content':1,'_id':0}))[0]['content']
        except Exception as e:
          logging.error(e)    
    

    mongodb字段_ID问题

    ES

    写入ES
    写入es的时候需要注意,

    id = data['_id']如果是mongodb读出来的数据会有_id这个字段,而es中也会默认给一个id字段,所以需要将数据结构的id删除掉。否则会报错。
      del data['_id']
      add(index='allchannel-iksmart', body=data,id=str(id))
    

    写入索引

    body = {"name": "long", "age": 11,"height": 111}
    add(index=index_name,body=body,id=1)
    
    :param index: 索引名称
    :param body:文档内容
    :param id: 是否指定id,如不指定就会使用生成的字符串
    

    MAIN

    stime 查询起始时间
    etime 查询截止时间
    formatdata 主逻辑函数

    if __name__ == '__main__':
      stime = 1577808000
      etime = 1580486400
      Data_migrate().formatdata(stime=stime,etime=etime)
    
    
  • 相关阅读:
    数据库模式
    数据流模式、转换、格式与操作
    桥接模式=抽象层协作关系+继承体系注入
    php 中更简洁的三元运算符 ?:
    larave5.6 将Excel文件数据导入数据库代码实例
    Laravel获取所有的数据库表及结构
    Laravel框架数据库CURD操作、连贯操作总结
    insert into 语句的三种写法
    Laravel 上传excel,读取并写入数据库 (实现自动建表、存记录值
    laravel5.4将excel表格中的信息导入到数据库中
  • 原文地址:https://www.cnblogs.com/zhaobowen/p/13269383.html
Copyright © 2011-2022 走看看