zoukankan      html  css  js  c++  java
  • ElasticSearch使用

    插入数据

    示例

    #!/usr/bin/python
    # -*- coding:utf-8 -*-
    
    import os
    import re
    import json
    import time
    import elasticsearch
    
    from elasticsearch.helpers import bulk
    from multiprocessing import Pool
    
    
    def add_one(file_path, doc_type, index):
        """准备插入一条"""
        print doc_type, index
        es_client = elasticsearch.Elasticsearch(hosts=[{"host": "localhost", "port": "9200"}])
        with open(file_path, "r") as f:
            for line in f:
                try:
                    line = re.sub("
    ", "", line)
                    dict_obj = json.loads(line)
                    es_client.index(index=index, doc_type=doc_type, body=dict_obj)
                except Exception as e:
                    print "出错了, 错误信息: {}".format(e)
    
    def add_bulk(doc_type, file_path, bulk_num, index):
        """"""
        es_client = elasticsearch.Elasticsearch(hosts=[{"host": "localhost", "port": "9200"}])
        action_list = []
        global param
    
        with open(file_path, "r") as f:
    
            for line in f:
                # 去除每一行数据中的"
    "字符, 也可以替换为"\n"
                line = line.replace("
    ", "")
                dict_obj = json.loads(line)
    
                data = {}
                # 第一层
                for key_1, value_1 in dict_obj.iteritems():
                    data[key_1] = {}
                    if isinstance(value_1, dict):
                        # 第二层
                        for key_2, value_2 in value_1.iteritems():
                            data[key_1][key_2] = {}
                            if isinstance(value_2, dict):
                                # 第三层
                                for key_3, value_3 in value_2.iteritems():
                                    data[key_1][key_2][key_3] = {}
                                    if isinstance(value_3, dict):
                                        # 第四层
                                        for key_4, value_4 in value_3.iteritems():
                                            if isinstance(value_4, dict):
                                                value_4 = json.dumps(value_4)
                                            data[key_1][key_2][key_3][key_4] = value_4
                                    else:
                                        data[key_1][key_3] = value_3
                            else:
                                data[key_1][key_2] = value_2
                    else:
                        data[key_1] = value_1
    
                # 根据bulk_num的值发送一个批量插入请求
                action_list.append({
                    "_op_type": "index",
                    "_index": index,
                    "_type": doc_type,
                    "_source": data,
                })
    
                if len(action_list) >= bulk_num:
                    try:
                        print "Start Bulk {}...".format(doc_type)
                        success, failed = bulk(es_client, action_list, index=index, raise_on_error=True, request_timeout=300)
                        print "End Bulk {}...".format(doc_type)
                    except Exception as e:
                        print "出错了, Type:{}, 错误信息:{}".format(doc_type, e[0])
                    finally:
                        del action_list[0:len(action_list)]
    
            # 如果不是bulk_num的等值, 那么就判断列表是否为空, 再次发送一次请求
            if len(action_list) > 0:
                try:
                    print "Start Bulk {}...".format(doc_type)
                    success, failed = bulk(es_client, action_list, index=index, raise_on_error=True, request_timeout=300)
                    print "End Bulk {}...".format(doc_type)
                except Exception as e:
                    print "出错了, Type:{}, 错误信息:{}".format(doc_type, e[0])
                finally:
                    del action_list[0:len(action_list)]
    
    def mulit_process(path, index, bulk_num, data):
        """"""
        # 多进程执行
        pool = Pool(10)
    
        results = []
        for i in data:
            doc_type = i["doc_type"]
            file_path = i["file_path"]
            result = pool.apply_async(add_bulk, args=(doc_type, file_path, bulk_num, index))
            results.append(result)
        
        pool.close()
        pool.join()
    
        # for result in results:
        #     print result.get()
    
    def all_info(path):
        data = []
        for i in os.listdir(path):
            file_dict = {}
            if i.endswith(".json"):
                doc_type = i.split("_")[0]
                file_path = path + i
                file_dict["doc_type"] = doc_type
                file_dict["file_path"] = file_path
                data.append(file_dict)
    
        return data
    
    def es_insert(process_func=None):
        """"""
        # 库
        index = "test"
        # 文件路径
        path="/home/data"
        
        # 批量插入的数量, 如果是json整条数据插入的话, 可能会出现字段过长的问题, 导致插不进去, 适当调整bulk_num的值
        bulk_num = 5000
    
        if not path.endswith("/"):
            path += "/"
    
        data = all_info(path)
    
        if process_func == "bulk":
            # 插入多条, doc_type, file_path, bulk_num, index
            add_bulk("80", path + "80_result.json", bulk_num, index)
        elif process_func == "one":
            # 插入单条file_path, doc_type, index
            add_one(path + "80_result.json", "80", index)
        else:
            # 多进程
            mulit_process(path, index, bulk_num, data)
    
    if __name__ == "__main__":
        # 计算脚本执行时间
        start_time = time.time()
    
        # 插入数据
        es_insert()
    
        # 计算脚本执行时间
        end_time = time.time()
        print end_time - start_time
    

    查询数据

    curl -XPOST "0.0.0.0:9200/test/_search?pretty" -d '{"query":{"bool":{"must":[{"query_string":{"default_field":"ip","query":"40.74.78.234"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"aggs":{}}'

    删除索引

    curl -X DELETE "localhost:9200/index"

    index索引名称库名

    删除type

    curl -XPOST 'localhost:9200/index/type/_delete_by_query?conflicts=proceed&pretty' -H 'Content-Type: application/json' -d'{"query": {"match_all": {}}}'

    index索引名称库名, type类型

  • 相关阅读:
    qsort()的使用
    c语言不寻常的类型转换(类型提升)
    堆栈段的三个主要用途
    区分 声明与定义
    宏定义陷阱与typedef
    约瑟夫环解决方案
    线程中断测试
    Redis
    本地缓存
    tomcat优化
  • 原文地址:https://www.cnblogs.com/zzhaolei/p/11067975.html
Copyright © 2011-2022 走看看