zoukankan      html  css  js  c++  java
  • python bulk批量保存elasticsearch数据

    之前用kclpy读取kinesis流数据,处理并保存到elasticsearch中,现在发现elk中的数据展示与当前时间差越来越大(源数据增加了三倍)。阅读kinesis文档进行相应分片、实例数扩展,均没有明显效果。

    重新优化了下代码,使用了bulk批量保存数据到elasticsearch,存放速率明显提高。

    相关示例代码:

    from datetime import datetime
    import pytz 
    import time
    from elasticsearch import Elasticsearch 
    from elasticsearch.helpers import bulk
    import json
    
    es = Elasticsearch(hosts=[{'host': "ip", 'port': "9200"}], http_auth=("username", "password")) 
    def index_bulk():
        ACTIONS = []
        count = 0
        for i in range(500):
            t = time.time()
            kinesisdict = {
                "priority": 0, 
                "tags": {i},
                "threshold": 0, 
                "kinesis": True, 
                "env": "test", 
                "region": "cn", 
                "metric": "/var/log/sengled/bulk.log", 
                "dataSource": "bulk", 
                "service": "bulk", 
                "status": "", 
                "endpoint": "test-cn-inception-10.12.112.165", 
                "starttime": t, 
                "product": "bulk", 
                "step": 0, 
                "value": "bulk", 
                "ip": "10.12.112.165", 
                "objectType": "dev", 
                "endtime": t, 
                "timestamp": t, 
                "counterType": ""
            }
            count = i
    
            # kinesisdict = json.loads(json.dumps(bulk_json))
            kdict = kinesisdict.copy()
            kdict['@timestamp'] = datetime.fromtimestamp(int(kinesisdict['timestamp']),pytz.timezone('Asia/Shanghai'))
            if kdict['starttime'] == 0:
                kdict['starttime'] = datetime.fromtimestamp(int(kinesisdict['timestamp']),pytz.timezone('Asia/Shanghai'))
            else:
                kdict['starttime'] = datetime.fromtimestamp(int(kinesisdict['starttime']),pytz.timezone('Asia/Shanghai'))
    
            if kdict['endtime'] == 0:
                kdict['endtime'] = datetime.fromtimestamp(int(kinesisdict['timestamp']),pytz.timezone('Asia/Shanghai'))
            else:
                kdict['endtime'] = datetime.fromtimestamp(int(kinesisdict['endtime']),pytz.timezone('Asia/Shanghai'))
    
            kdict['value'] = str(kinesisdict['value'])
            kdict['threshold'] = str(kinesisdict['threshold'])
            kdict['tags'] = str(kinesisdict['tags'])
            del kdict['timestamp'] 
    
            action = {
                "_index": "kinesis-2018.07.19",
                "_type": "kinesisdata",
                "_source": kdict
            }
            ACTIONS.append(action)
        print(ACTIONS)
        bulk(es, ACTIONS, index = "kinesis-2018.11.28", raise_on_error=True)
    
        print("insert %s lines" % count)
    
    
    index_bulk()
    

    转:https://www.cnblogs.com/husbandmen/p/10033775.html

  • 相关阅读:
    解决: 误将分区的GHO镜像文件恢复到整个硬盘
    腾讯的迷你门户首页新闻用到的Silverlight技术引用
    [转]如何在word文档里面的小方框内打钩
    Microsoft .NET Framework 3.5/4 Client Profile
    Java Web 开发软件下载地址
    tomcat 6.0环境, 网页超链接,文件下载另存为时,不能识别msi文件类型,另存为只能选htm和所有文件。
    英语小记
    去掉WORD文档中向下的小箭头(换行符)
    开个小餐馆要多少成本
    如何租间餐饮店
  • 原文地址:https://www.cnblogs.com/davis12/p/14787084.html
Copyright © 2011-2022 走看看