zoukankan      html  css  js  c++  java
  • elasticsearch bulk

    情景介绍

    公司2000W的数据从mysql 迁移至elasticsearch,以提供微服务。本文基于elasticsearch-py bulk操作实现数据迁移。相比于elasticsearch-dump,自由度更大,并能够进行数据处理。

    API 原理

    让我们先来看一下官方文档给出的栗子

    POST _bulk
    { "index" : { "_index" : "test", "_id" : "1" } }
    { "field1" : "value1" }
    { "delete" : { "_index" : "test", "_id" : "2" } }
    { "create" : { "_index" : "test", "_id" : "3" } }
    { "field1" : "value3" }
    { "update" : {"_id" : "1", "_index" : "test"} }
    { "doc" : {"field2" : "value2"} }
    

    我们可以通过kibana试验一下

    elasticsearch-py

    elasticsearch-py 官方文档
    这里实际上我使用的是es-py的接口,栗子如下

    def gendata():
        mywords = ['foo', 'bar', 'baz']
        for word in mywords:
            yield {
                "_index": "mywords",
                "_type": "document",
                "doc": {"word": word}, # field1: "value1"
            }
    
    bulk(es, gendata())
    

    实际操作

    涉及到数据读取,以及批量的大小。一般建议是1000-5000个文档,如果你的文档很大,可以适当减少队列,大小建议是5-15MB,默认不能超过100M

    import re
    
    from elasticsearch import Elasticsearch
    from elasticsearch import helpers
    import pymysql
    
    
    es = Elasticsearch()
    conn = pymysql.connect('127.0.0.1',"root","root","literature",charset='utf8')
    
    
    def read(conn,tableName):
        cursor = conn.cursor()
        sql = "show columns from {};".format(tableName)
        cursor.execute(sql)
        columns = [i[0] for i in cursor.fetchall()]
    
        select = "select * from {};".format(tableName)
        nums = cursor.execute(select)
        for i in range(nums):
            yield {k:v for k,v in zip(columns,cursor.fetchone())}
    
    
    def bulk_insert(d):
        actions = []
        for i in d:
            _id = i.get('id')
            # 数据处理逻辑
            i['autor'] = i.get('autor').split(',')
            i['artkeyword'] = re.sub(r'[[]d]',"",str(i.get('artkeyword',""))).strip(';').split(';')
            i['dateofpublication'] = i.get('dateofpublication').strftime('%Y-%m-%d') # 注意需要将datetime格式转换成字符串类型
            i['dateofsummery'] = i.get('dateofsummery').strftime('%Y-%m-%d %H:%M:%S') # 注意需要将datetime格式转换成字符串类型
            #
            action = {
                "_index":"literature",
                "_type":"_doc",
                "_id":_id,
                }
            action.update(i)
            actions.append(action)
            if len(actions) == 500:
                helpers.bulk(es,actions)
                actions = []
        if (len(actions) > 0):
            helpers.bulk(es, actions)
    
    
    if __name__ == "__main__":
        d = read(conn,"literature_info")
        bulk_insert(d)
        conn.close()
    
    
    
  • 相关阅读:
    AX2009直接交运的bug
    数据库日志
    新蛋中国最新的分类导航,右侧展开菜单,可以修改向左或者向右展开
    用图片代替滚动条的代码
    新蛋网的大图展示效果,缩略图点击显示大图,上一个下一个
    Banner 切换,大小图不同,支持FF和OPERA,IE系列
    下拉菜单,支持所有浏览器
    电容选型
    000.数字电子技术分类
    Altium design16设计技巧
  • 原文地址:https://www.cnblogs.com/zenan/p/11132485.html
Copyright © 2011-2022 走看看