zoukankan      html  css  js  c++  java
  • Python 批量插入ES

      使用Python批量插入数据到ES中,如果是一条条插入,会发现效率很低,这时需要使用ES的批量插入bulk的功能。

      以下示例代码,是将masscan输出的结果文件,抽取ip,port,和时间戳,插入到es中的。

    #!/usr/bin/python
    # coding=utf-8
    
    import json
    import time
    from elasticsearch import Elasticsearch
    from elasticsearch import helpers
    import ssl
    
    es = Elasticsearch(
        [{"host": "xx.xx.xx.xx", "port": "xx"}])
    
    print(es.info())
    
    
    # 添加timestamp
    time_now = int(time.time())
    time_local = time.localtime(time_now)
    timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time_local)
    date_t, time_t = timestamp.split(' ')
    time_format = '{}T{}.000Z'.format(date_t, time_t)
    print(time_format)
    
    
    ip_ports = []
    # 提取 masscan.json 中的 ip:port 信息
    
    
    def handle_masscan(target):
        index = 0
        with open(target, 'r') as f:
            for line in f:
                index += 1
                if line.startswith('{ '):
                    temp = json.loads(line[:-2])
                    ip = str(temp["ip"]).strip()
                    port = str(temp["ports"][0]["port"]).strip()
                    ip_port = [ip, port]
                    ip_ports.append(ip_port)
    
    
    def timer(func):
        def wrapper(*args, **kwargs):
            start = time.time()
            res = func(*args, **kwargs)
            print('共耗时约 {:.2f} 秒'.format(time.time() - start))
            return res
    
        return wrapper
    
    @timer
    def gen():
        actions = []
        for line in ip_ports:
            # 拼接插入数据结构
            action = {
                "_index": "server_port_info_2020_q4",
                "_type": "doc",
                "_source": {
                    "ip": line[0],
                    "port": line[1],
                    "@timestamp": time_format,
                }
            }
            actions.append(action)
        g(es, actions)
    
    
    if __name__ == '__main__':
        target = '../port_info_2_es/masscan.json'
        handle_masscan(target)
        gen()
        pass

    参考:

      Elasticsearch - 使用Python批量写入数据:

        https://www.cnblogs.com/Neeo/articles/10788573.html

      使用Python-elasticsearch-bulk批量快速向elasticsearch插入数据:
        https://blog.csdn.net/weixin_39198406/article/details/82983256

      Bulk helpers:

        https://elasticsearch-py.readthedocs.io/en/7.10.0/helpers.html

    -------------------------------------------

    个性签名:如果世上的事都按你说的道理走 世界就不是现在这样了!

    如果觉得这篇文章对你有小小的帮助的话,记得在右下角点个“推荐”哦,博主在此感谢!

  • 相关阅读:
    Linux下调试caffe
    MXnet的使用
    Cmake的使用
    深度学习的移动端实现
    【WPF】面板布局介绍Grid、StackPanel、DockPanel、WrapPanel
    【WinForm】Dev ComboxEdit、BarManager的RepositoryItemComboBox、ComboBox操作汇总
    【WinForm】DataGridView使用小结
    【Linux】常用指令
    【c++】MFC 程序入口和执行流程
    【WPF】拖拽改变控件大小
  • 原文地址:https://www.cnblogs.com/Hi-blog/p/Using-Bulk-Write-Data-To-ES.html
Copyright © 2011-2022 走看看