zoukankan      html  css  js  c++  java
  • python bulk批量保存elasticsearch数据

    之前用kclpy读取kinesis流数据,处理并保存到elasticsearch中,现在发现elk中的数据展示与当前时间差越来越大(源数据增加了三倍)。阅读kinesis文档进行相应分片、实例数扩展,均没有明显效果。

    重新优化了下代码,使用了bulk批量保存数据到elasticsearch,存放速率明显提高。

    相关示例代码:

    from datetime import datetime
    import pytz 
    import time
    from elasticsearch import Elasticsearch 
    from elasticsearch.helpers import bulk
    import json
    
    es = Elasticsearch(hosts=[{'host': "ip", 'port': "9200"}], http_auth=("username", "password")) 
    def index_bulk():
        ACTIONS = []
        count = 0
        for i in range(500):
            t = time.time()
            kinesisdict = {
                "priority": 0, 
                "tags": {i},
                "threshold": 0, 
                "kinesis": True, 
                "env": "test", 
                "region": "cn", 
                "metric": "/var/log/sengled/bulk.log", 
                "dataSource": "bulk", 
                "service": "bulk", 
                "status": "", 
                "endpoint": "test-cn-inception-10.12.112.165", 
                "starttime": t, 
                "product": "bulk", 
                "step": 0, 
                "value": "bulk", 
                "ip": "10.12.112.165", 
                "objectType": "dev", 
                "endtime": t, 
                "timestamp": t, 
                "counterType": ""
            }
            count = i
    
            # kinesisdict = json.loads(json.dumps(bulk_json))
            kdict = kinesisdict.copy()
            kdict['@timestamp'] = datetime.fromtimestamp(int(kinesisdict['timestamp']),pytz.timezone('Asia/Shanghai'))
            if kdict['starttime'] == 0:
                kdict['starttime'] = datetime.fromtimestamp(int(kinesisdict['timestamp']),pytz.timezone('Asia/Shanghai'))
            else:
                kdict['starttime'] = datetime.fromtimestamp(int(kinesisdict['starttime']),pytz.timezone('Asia/Shanghai'))
    
            if kdict['endtime'] == 0:
                kdict['endtime'] = datetime.fromtimestamp(int(kinesisdict['timestamp']),pytz.timezone('Asia/Shanghai'))
            else:
                kdict['endtime'] = datetime.fromtimestamp(int(kinesisdict['endtime']),pytz.timezone('Asia/Shanghai'))
    
            kdict['value'] = str(kinesisdict['value'])
            kdict['threshold'] = str(kinesisdict['threshold'])
            kdict['tags'] = str(kinesisdict['tags'])
            del kdict['timestamp'] 
    
            action = {
                "_index": "kinesis-2018.07.19",
                "_type": "kinesisdata",
                "_source": kdict
            }
            ACTIONS.append(action)
        print(ACTIONS)
        bulk(es, ACTIONS, index = "kinesis-2018.11.28", raise_on_error=True)
    
        print("insert %s lines" % count)
    
    
    index_bulk()
    

    转:https://www.cnblogs.com/husbandmen/p/10033775.html

  • 相关阅读:
    spring集成环境下的axis webservice的发布,调试
    Axis2 webservice 之使用java调用webservice
    Axis2 webservice入门--写个简单的webservice
    Axis2 webservice入门--开发环境搭建,概念理解
    使用js给页面显示的图片添加水印效果
    java使用dom4j解析xml文件
    一个Java递归程序
    Java-Scanner键盘输入
    JVM—调优参数学习
    JVM—类加载过程
  • 原文地址:https://www.cnblogs.com/davis12/p/14787084.html
Copyright © 2011-2022 走看看