zoukankan      html  css  js  c++  java
  • python批量插入数据到es和读取es数据

    一、插入数据

    1、首先准备类似如下数据

    {"_type": "type1", "_id": 1, "_index": "test", "_source": {"JOBNAME0": "guba_eastmoney_com_265162", "JOBNAME1": "guba_eastmoney_com_265162"}}

    2、调用es相关模块插入数据到es中

    #!/usr/bin/python
    import threading
    import queue
    import json
    import time
    from elasticsearch import Elasticsearch
    from elasticsearch import helpers
    import os
    import sys
    
    # 
    
    # host_list = [
    #     {"host":"10.58.7.190","port":9200},
    #     {"host":"10.58.55.191","port":9200},
    #     {"host":"10.58.55.192","port":9200},
    # ]
    #
    host_list = [
        {"host":"10.87.7.190","port":9200},
    ]
    
    
    
    # create a es clint obj
    client = Elasticsearch(host_list)
    
    
    with open(os.path.join(os.path.dirname(os.path.abspath(__file__)),"insert.json"),"r") as f:
        for line in f:
            actions = []
            actions.append(json.loads(line))
            try:
                for k, v in helpers.parallel_bulk(client=client, thread_count=1, actions=actions):
                    # 这里的actions是插入es的数据,这个格式必须是列表的格式,列表的每个元素又必须是字典
                    pass
            except Exception as e:
                sys.stderr(e)

    3、查看es索引中的文档数

    [root@test1 cdrom]# curl -XGET 'http://10.87.7.190:9200/_cat/indices?v&pretty'
    health status index uuid                   pri rep docs.count docs.deleted store.size pri.store.size
    yellow open   test  r91GhsFVT7iF6M3iAuNEKg   2   5      19362            0      1.3mb        499.7kb

     

    二、读取es的数据

    #!/usr/bin/python
    from kafka import KafkaProducer
    import threading
    import json
    import time
    from elasticsearch import Elasticsearch
    from elasticsearch import helpers
    import os
    import sys
    import argparse
    import random
    
    def get_data_from_es():
        host_list = [
            {"host": "10.58.55.1", "port": 9200},
            {"host": "10.58.55.2", "port": 9200},
            {"host": "10.58.55.7", "port": 9200},
            # {"host": "10.58.55.201", "port": 9200},
            # {"host": "10.58.55.202", "port": 9200},
            # {"host": "10.58.55.203", "port": 9200},
        ]
    
        es = Elasticsearch(host_list)
        err_data_num = 0
        correct_data_num = 0
        size = 100
        query = es.search(index='full_sight', scroll='1m', size=size)
        for m in query['hits']['hits']:
            # print(m)
            d_id = m["_id"]
            if "LASTOPER" in m["_source"].keys():
            # if "UPDATE_TEST" in m["_source"].keys():
    
    
                if m["_source"]["LASTOPER"] == "" + str(d_id):
                    correct_data_num += 1
                else:
                    err_data_num += 1
            else:
                err_data_num += 1
                # print("id为{d_id}数据未更新成功,错误的条数为{num}".format(d_id=d_id, num=err_data_num))
    
    
        results = query['hits']['hits']
    
        total = query['hits']['total']
        scroll_id = query['_scroll_id']
    
    
        page = divmod(total, size)
        if page[1] == 0:
            page = page[0]
        else:
            page = page[0] + 1
    
        for i in range(0, page):
            try:
                query_scroll = es.scroll(scroll_id=scroll_id, scroll='1m', )['hits']['hits']
            except Exception as e:
                continue
            else:
                for m in query_scroll:
    
                    d_id = m.get("_id",None)
                    if "LASTOPER" in m["_source"].keys():
    
                        if m["_source"]["LASTOPER"] == "test" + str(d_id):
                            correct_data_num += 1
                        else:
                            err_data_num += 1
                    else:
                        err_data_num += 1
    
        return err_data_num,correct_data_num
    
    if __name__ == '__main__':
        while True:
            error,correct = get_data_from_es()
            print("未更新的数据的条数为:{num}".format(num = error))
            print("已更新的数据的条数为:{num}".format(num = correct))
            print("=" * 200)
            if int(error) == 0:
                break
            else:
                continue
  • 相关阅读:
    八数码难题 (codevs 1225)题解
    小木棍 (codevs 3498)题解
    sliding windows (poj 2823) 题解
    集合删数 (vijos 1545) 题解
    合并果子 (codevs 1063) 题解
    等价表达式 (codevs 1107)题解
    生理周期 (poj 1006) 题解
    区间 (vijos 1439) 题解
    区间覆盖问题 题解
    种树 (codevs 1653) 题解
  • 原文地址:https://www.cnblogs.com/bainianminguo/p/11655370.html
Copyright © 2011-2022 走看看