zoukankan      html  css  js  c++  java
  • elasticsearch bulk

    情景介绍

    公司2000W的数据从mysql 迁移至elasticsearch,以提供微服务。本文基于elasticsearch-py bulk操作实现数据迁移。相比于elasticsearch-dump,自由度更大,并能够进行数据处理。

    API 原理

    让我们先来看一下官方文档给出的栗子

    POST _bulk
    { "index" : { "_index" : "test", "_id" : "1" } }
    { "field1" : "value1" }
    { "delete" : { "_index" : "test", "_id" : "2" } }
    { "create" : { "_index" : "test", "_id" : "3" } }
    { "field1" : "value3" }
    { "update" : {"_id" : "1", "_index" : "test"} }
    { "doc" : {"field2" : "value2"} }
    

    我们可以通过kibana试验一下

    elasticsearch-py

    elasticsearch-py 官方文档
    这里实际上我使用的是es-py的接口,栗子如下

    def gendata():
        mywords = ['foo', 'bar', 'baz']
        for word in mywords:
            yield {
                "_index": "mywords",
                "_type": "document",
                "doc": {"word": word}, # field1: "value1"
            }
    
    bulk(es, gendata())
    

    实际操作

    涉及到数据读取,以及批量的大小。一般建议是1000-5000个文档,如果你的文档很大,可以适当减少队列,大小建议是5-15MB,默认不能超过100M

    import re
    
    from elasticsearch import Elasticsearch
    from elasticsearch import helpers
    import pymysql
    
    
    es = Elasticsearch()
    conn = pymysql.connect('127.0.0.1',"root","root","literature",charset='utf8')
    
    
    def read(conn,tableName):
        cursor = conn.cursor()
        sql = "show columns from {};".format(tableName)
        cursor.execute(sql)
        columns = [i[0] for i in cursor.fetchall()]
    
        select = "select * from {};".format(tableName)
        nums = cursor.execute(select)
        for i in range(nums):
            yield {k:v for k,v in zip(columns,cursor.fetchone())}
    
    
    def bulk_insert(d):
        actions = []
        for i in d:
            _id = i.get('id')
            # 数据处理逻辑
            i['autor'] = i.get('autor').split(',')
            i['artkeyword'] = re.sub(r'[[]d]',"",str(i.get('artkeyword',""))).strip(';').split(';')
            i['dateofpublication'] = i.get('dateofpublication').strftime('%Y-%m-%d') # 注意需要将datetime格式转换成字符串类型
            i['dateofsummery'] = i.get('dateofsummery').strftime('%Y-%m-%d %H:%M:%S') # 注意需要将datetime格式转换成字符串类型
            #
            action = {
                "_index":"literature",
                "_type":"_doc",
                "_id":_id,
                }
            action.update(i)
            actions.append(action)
            if len(actions) == 500:
                helpers.bulk(es,actions)
                actions = []
        if (len(actions) > 0):
            helpers.bulk(es, actions)
    
    
    if __name__ == "__main__":
        d = read(conn,"literature_info")
        bulk_insert(d)
        conn.close()
    
    
    
  • 相关阅读:
    单例模式
    HashSet、LinkedHashSet、SortedSet、TreeSet
    ArrayList、LinkedList、CopyOnWriteArrayList
    HashMap、Hashtable、LinkedHashMap
    andrew ng machine learning week8 非监督学习
    andrew ng machine learning week7 支持向量机
    andrew ng machine learning week6 机器学习算法理论
    andrew ng machine learning week5 神经网络
    andrew ng machine learning week4 神经网络
    vue组件监听属性变化watch方法报[Vue warn]: Method "watch" has type "object" in the component definition. Did you reference the function correctly?
  • 原文地址:https://www.cnblogs.com/zenan/p/11132485.html
Copyright © 2011-2022 走看看