zoukankan      html  css  js  c++  java
  • elasticsearch之使用Python批量写入数据

    elasticsearch之使用Python批量写入数据

     

    顺序写入100条#top

    现在我们如果有大量的文档(例如10000000万条文档)需要写入es的某条索引中,该怎么办呢?之前学过的一次插入一条肯定不行:

    Copy
    import time
    from elasticsearch import Elasticsearch
    
    es = Elasticsearch()
    
    def timer(func):
        def wrapper(*args, **kwargs):
            start = time.time()
            res = func(*args, **kwargs)
            print('共耗时约 {:.2f} 秒'.format(time.time() - start))
            return res
        return wrapper
    
    @timer
    def create_data():
        """ 写入数据 """
        for line in range(100):
            es.index(index='s2', doc_type='doc', body={'title': line})
    
    if __name__ == '__main__':
        create_data()   # 执行结果大约耗时 7.79 秒
    

    上例为顺序向es的s2索引(该索引已存在)写入100条文档,而且值也仅是数字。却花费了大约7秒左右,这种速度在大量数据的时候,肯定不行。那怎么办呢?

    批量写入100条#top

    现在,来介绍一种批量写入的方式:

    Copy
    import time
    from elasticsearch import Elasticsearch
    from elasticsearch import helpers
    
    es = Elasticsearch()
    
    def timer(func):
        def wrapper(*args, **kwargs):
            start = time.time()
            res = func(*args, **kwargs)
            print('共耗时约 {:.2f} 秒'.format(time.time() - start))
            return res
    
        return wrapper
    
    @timer
    def create_data():
        """ 写入数据 """
        for line in range(100):
            es.index(index='s2', doc_type='doc', body={'title': line})
    
    @timer
    def batch_data():
        """ 批量写入数据 """
        action = [{
            "_index": "s2",
            "_type": "doc",
            "_source": {
                "title": i
            }
        } for i in range(10000000)]
        helpers.bulk(es, action)
    
    
    if __name__ == '__main__':
        # create_data()
        batch_data()  # MemoryError
    

    我们通过elasticsearch模块导入helper,通过helper.bulk来批量处理大量的数据。首先我们将所有的数据定义成字典形式,各字段含义如下:

    • _index对应索引名称,并且该索引必须存在。
    • _type对应类型名称。
    • _source对应的字典内,每一篇文档的字段和值,可有有多个字段。

    首先将每一篇文档(组成的字典)都整理成一个大的列表,然后,通过helper.bulk(es, action)将这个列表写入到es对象中。
    然后,这个程序要执行的话——你就要考虑,这个一千万个元素的列表,是否会把你的内存撑爆(MemoryError)!很可能还没到没到写入es那一步,却因为列表过大导致内存错误而使写入程序崩溃!很不幸,我的程序报错了。下图是我在生成列表的时候,观察任务管理器的进程信息,可以发现此时Python消耗了大量的系统资源,而运行es实例的Java虚拟机却没什么变动。

    解决办法是什么呢?我们可以分批写入,比如我们一次生成长度为一万的列表,再循环着去把一千万的任务完成。这样, Python和Java虚拟机达到负载均衡。
    下面的示例测试10万条数据分批写入的速度:

    Copy
    import time
    from elasticsearch import Elasticsearch
    from elasticsearch import helpers
    
    es = Elasticsearch()
    
    def timer(func):
        def wrapper(*args, **kwargs):
            start = time.time()
            res = func(*args, **kwargs)
            print('共耗时约 {:.2f} 秒'.format(time.time() - start))
            return res
    
        return wrapper
    @timer
    def batch_data():
        """ 批量写入数据 """
        # 分批写
        # for i in range(1, 10000001, 10000):
        #     action = [{
        #         "_index": "s2",
        #         "_type": "doc",
        #         "_source": {
        #             "title": k
        #         }
        #     } for k in range(i, i + 10000)]
        #     helpers.bulk(es, action)
        # 使用生成器
        for i in range(1, 100001, 1000):
            action = ({
                "_index": "s2",
                "_type": "doc",
                "_source": {
                    "title": k
                }
            } for k in range(i, i + 1000))
            helpers.bulk(es, action)
    
    if __name__ == '__main__':
        # create_data()
        batch_data()
    

    注释的内容是使用列表完成,然后使用生成器完成。结果耗时约93.53 秒。

    较劲,我就想一次写入一千万条#top

    经过洒家多年临床经验发现,程序员为什么掉头发?都是因为爱较劲!
    上面的例子已经不错了,但是仔细观察,还是使用了两次for循环,但是代码可否优化,答案是可以的,我们直接使用生成器:

    Copy
    import time
    from elasticsearch import Elasticsearch
    from elasticsearch import helpers
    
    es = Elasticsearch()
    
    def timer(func):
        def wrapper(*args, **kwargs):
            start = time.time()
            res = func(*args, **kwargs)
            print('共耗时约 {:.2f} 秒'.format(time.time() - start))
            return res
    
        return wrapper
    @timer
    def gen():
        """ 使用生成器批量写入数据 """
        action = ({
            "_index": "s2",
            "_type": "doc",
            "_source": {
                "title": i
            }
        } for i in range(100000))
        helpers.bulk(es, action)
    
    if __name__ == '__main__':
        # create_data()
        # batch_data()
        gen()
    

    我们将生成器交给es去处理,这样,Python的压力更小了,你要说Java虚拟机不是压力更大了,无论是分批处理还是使用生成器,虚拟机的压力都不小,写入操作本来就耗时嘛!上例测试结果大约是耗时90秒钟,还行,一千万的任务还是留给你去测试吧!


    欢迎斧正,that's all

    作者: 听雨危楼

    出处:https://www.cnblogs.com/Neeo/articles/10788573.html

     

  • 相关阅读:
    Android 五大布局
    jdk6的安装以及环境变量的设置
    PLSQL Developer图形化窗口创建数据库全过程
    未能加载文件或程序集“Oracle.DataAccess, " 64位OS运行32位程序的问题
    Android SDK 无法连接到GOOGLE 下载安装包
    Android开发之旅:环境搭建
    Android开发把项目打包成apk
    在 VMware Workstation 虚拟机中创建共享文件夹的步骤〔图解〕
    谈谈对于企业级系统架构的理解
    C#图片处理之: 获取数码相片的EXIF信息
  • 原文地址:https://www.cnblogs.com/xiondun/p/13455621.html
Copyright © 2011-2022 走看看