zoukankan      html  css  js  c++  java
  • Elasticsearch Python脚本查询常用操作

    一、ES Python脚本查询操作  http方式

    #!coding:utf-8
    
    import json
    import logging
    import time
    
    import requests
    
    PAGE_RESULT_SCROLL_ID = 'scroll_id'
    PAGE_RESULT_SCROLL_SIZE = 'scroll_size'
    PAGE_RESULT_TOTAL_SIZE = 'total_size'
    PAGE_RESULT_HITS = 'hits'
    PAGE_RESULT_DATA = 'data'
    PAGE_RESULT_CONVERT_DATA = 'convert_data'
    
    CONVERT_DEST_KEY = 'dest_key'
    CONVERT_DEFAULT_VALUE = 'default_value'
    
    current_time = time.strftime("%Y-%m-%d-%H-%M", time.localtime(time.time()))
    # 日志设置
    log_file = "operate_es_" + current_time + ".log"
    logging.FileHandler(filename=log_file, encoding='utf-8')
    logging.basicConfig(filename=log_file, level=logging.INFO)
    
    
    # 创建索引
    def create_index(es_url, index_name, es_mapping):
        logging.info("es_url:%s index_name:%s " % (es_url, index_name))
    
        es_index_url = es_url + index_name
        logging.info("es_index_url: %s" % (es_index_url))
        r = requests.put(es_index_url, json.dumps(es_mapping), headers={"content-type": "application/json"})
        response_json = json.loads(r.text)
    
        if 200 == r.status_code:
            if response_json['acknowledged']:
                logging.info("index_name: %s 创建成功" % (index_name))
        else:
            logging.info("index_name: %s 创建失败" % (index_name))
            logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
            return -1
    
        logging.info("index create done!")
        return 0
    
    
    # 批量创建索引
    def batch_create_index_with_mapping(es_url, es_index_prefix, es_mapping, batch_num=1, start_index=0):
        logging.info("es_url:" + es_url)
        new_index_array = []
        for i in range(start_index, batch_num):
            suffix = "%03d" % i
            index_name = es_index_prefix + suffix
            es_index_url = es_url + index_name
            logging.info("es_index_url: %s" % (es_index_url))
            r = requests.put(es_index_url, json.dumps(es_mapping), headers={"content-type": "application/json"})
            response_json = json.loads(r.text)
    
            if 200 == r.status_code:
                if response_json['acknowledged']:
                    logging.info("index_name: %s 创建成功" % (index_name))
                    new_index_array.append(index_name)
    
            else:
                logging.info("index_name: %s 创建失败" % (index_name))
                logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
                break
    
        logging.info("index create done!")
        logging.info("new_index_array:%s" % (new_index_array))
    
    
    # 删除索引
    def delete_index(es_url, delete_index):
        delete_es_url = es_url + delete_index
        logging.info("es_url:%s delete_index:%s start..." % (delete_es_url, delete_index))
    
        r = requests.delete(delete_es_url, headers={"content-type": "application/json"})
        if 200 == r.status_code:
            response_json = json.loads(r.text)
            logging.info("delete数据返回响应: %s " % r.text.encode(encoding="utf-8"))
        else:
            logging.info("es_url:%s delete_index: %s 删除失败" % (es_url, delete_index))
            logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
            return -1
        return 0
    
    
    # 根据前缀删除索引
    def del_index_by_prefix(es_url, index_prefix):
        delete_es_url = es_url + index_prefix + "*"
        logging.info("es_url:%s  start..." % (delete_es_url))
        r = requests.delete(delete_es_url, headers={"content-type": "application/json"})
        if 200 == r.status_code:
            response_json = json.loads(r.text)
            logging.info("delete数据返回响应: %s " % r.text.encode(encoding="utf-8"))
        else:
            logging.info("delete_es_url:%s 删除失败" % (delete_es_url))
            logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
            return -1
        return 0
    
    
    # 批量前缀+分桶删除索引
    def batch_es_del_index(es_url, index_prefix, bucket_num, start_index=0):
        logging.info("es_url:" + es_url)
        delete_index_array = []
        for i in range(start_index, bucket_num):
            suffix = "%03d" % i
            delete_index = index_prefix + suffix
            ret = es_delete_index(es_url, delete_index)
            if 0 == ret:
                delete_index_array.append(delete_index)
            else:
                logging.info("delete_index:%s 失败" % (delete_index))
                break
    
        logging.info("batch_es_del_index done!")
        logging.info("delete_index_array:%s" % (delete_index_array))
    
    
    # 更新索引mapping
    def add_properties_to_index(es_url, index_name, doc_type, es_mapping):
        logging.info("es_url:" + es_url)
    
        es_index_url = es_url + index_name + '/' + doc_type + '/_mapping'
        logging.info("es_index_url: %s" % (es_index_url))
        r = requests.post(es_index_url, json.dumps(es_mapping), headers={"content-type": "application/json"})
        response_json = json.loads(r.text)
    
        if 200 == r.status_code:
            if response_json['acknowledged']:
                logging.info("index_name: %s 更新成功" % (index_name))
        else:
            logging.info("index_name: %s 更新失败" % (index_name))
            logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
            return -1
    
        logging.info("index modify done!")
        return 0
    
    
    # 批量更新索引mapping
    def batch_add_properties_to_index(es_url, index_prefix, doc_type, es_mapping, bucket_num, start_index=0):
        logging.info("es_url:" + es_url)
        new_index_array = []
        for i in range(start_index, bucket_num):
            suffix = "%03d" % i
            index_name = index_prefix + suffix
            es_index_url = es_url + index_name + '/' + doc_type + '/_mapping'
            logging.info("es_index_url: %s" % (es_index_url))
            r = requests.post(es_index_url, json.dumps(es_mapping), headers={"content-type": "application/json"})
            response_json = json.loads(r.text)
    
            if 200 == r.status_code:
                if response_json['acknowledged']:
                    logging.info("index_name: %s 更新成功" % (index_name))
                    new_index_array.append(index_name)
    
            else:
                logging.info("index_name: %s 更新失败" % (index_name))
                logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
                logging.info("index modify done new_index_array:%s" % (new_index_array))
                return -1
    
        logging.info("index modify done!")
        logging.info("new_index_array:%s" % (new_index_array))
        return 0
    
    
    # 备份索引内容(结构+数据)
    def es_reindex_with_routing(source_index, bak_index, query, routing_filed):
        # 备份数据
        url = es_url + "_reindex"
        data = {
            "source": {
                "index": source_index,
                "query": query
            },
            "dest": {
                "index": bak_index
            },
            "script": {
                "inline": "ctx._routing = ctx._source." + routing_filed,
                "lang": "painless"
            }
        }
        logging.info("source_index:%s to bak_index: %s start..." % (source_index, bak_index))
        r = requests.post(url, json.dumps(data), headers={"content-type": "application/json"})
        if 200 == r.status_code:
            response_json = json.loads(r.text)
            logging.info("备份数据返回响应: %s " % r.text.encode(encoding="utf-8"))
            logging.info("source_index:%s to bak_index: %s 复制成功" % (source_index, bak_index))
            logging.info("备份索引: %s , 备份数据量: %d" % (bak_index, response_json['total']))
        else:
            logging.info("source_index:%s bak_index: %s 备份失败" % (source_index, bak_index))
            logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
            return -1
        return 0
    
    
    # 备份索引
    def es_reindex(source_index, bak_index, query):
        # 备份数据
        url = es_url + "_reindex"
        data = {
            "source": {
                "index": source_index,
                "query": query
            },
            "dest": {
                "index": bak_index
            }
        }
        logging.info("source_index:%s to bak_index: %s start..." % (source_index, bak_index))
        r = requests.post(url, json.dumps(data), headers={"content-type": "application/json"})
        if 200 == r.status_code:
            response_json = json.loads(r.text)
            logging.info("备份数据返回响应: %s " % r.text.encode(encoding="utf-8"))
            logging.info("source_index:%s to bak_index: %s 复制成功" % (source_index, bak_index))
            logging.info("备份索引: %s , 备份数据量: %d" % (bak_index, response_json['total']))
        else:
            logging.info("source_index:%s bak_index: %s 备份失败" % (source_index, bak_index))
            logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
            return -1
        return 0
    
    
    # 批量备份
    def batch_es_reindex(es_url, source_index_prefix, bak_index_prefix, queryParam, bucket_num, start_index=0):
        '''
        :param es_url:
        :param source_index_prefix:
        :param bak_index_prefix:
        :param queryParam:
        queryParam = {
                "query": {
                    "bool": {
                        "must": [
                            {
                                "match_all": {}
                            }
                        ],
                        "must_not": [],
                        "should": []
                    }
                }
            }
        :param bucket_num:
        :param start_index:
        :return:
        '''
        logging.info("es_url:" + es_url)
        new_index_array = []
        for i in range(start_index, bucket_num):
            suffix = "%03d" % i
            source_index = source_index_prefix + suffix
            bak_index = bak_index_prefix + suffix
            ret = es_reindex(source_index, bak_index, queryParam["query"])
            if 0 == ret:
                new_index_array.append(bak_index)
            else:
                logging.info("source_index:%s to bak_index: %s 复制失败" % (source_index, bak_index))
                logging.info("do new_index_array:%s over" % (new_index_array))
                return -1
    
        logging.info("batch_es_reindex done!")
        logging.info("new_index_array:%s" % (new_index_array))
        return 0
    
    
    # 批量备份加路由指定
    def batch_es_reindex_with_routing(es_url, source_index_prefix, bak_index_prefix, queryParam, routing_filed, bucket_num,
                                      start_index=0):
        '''
        :param es_url:
        :param source_index_prefix:
        :param bak_index_prefix:
        :param queryParam:
        queryParam = {
                "query": {
                    "bool": {
                        "must": [
                            {
                                "match_all": {}
                            }
                        ],
                        "must_not": [],
                        "should": []
                    }
                }
            }
        :param bucket_num:
        :param routing_filed:
        :param start_index:
        :return:
        '''
        logging.info("es_url:" + es_url)
        new_index_array = []
        for i in range(start_index, bucket_num):
            suffix = "%03d" % i
            source_index = source_index_prefix + suffix
            bak_index = bak_index_prefix + suffix
            ret = es_reindex_with_routing(source_index, bak_index, queryParam["query"], routing_filed)
            if 0 == ret:
                new_index_array.append(bak_index)
            else:
                logging.info("source_index:%s to bak_index: %s 复制失败" % (source_index, bak_index))
                logging.info("do new_index_array:%s over" % (new_index_array))
                return -1
    
        logging.info("batch_es_reindex done!")
        logging.info("new_index_array:%s" % (new_index_array))
        return 0
    
    
    # 根据业务规则创建索引
    def create_index_by_business_code_rel_type_dict(es_url, es_index_prefix, es_mapping,
                                                    business_codes_rel_type_dict,
                                                    do_reindex=False,
                                                    source_index='',
                                                    routing_field=''):
        '''
        :param es_url:
        :param es_index_prefix:
        :param es_mapping:
        :param business_codes_rel_type_dict:
        business_codes_rel_type_dict = {"003": ["fk_tl"],"004": ["bq_sb"],"005": [ "jd_zh", "jd_zz"]}
        :param do_reindex:
        :param source_index:
        :param routing_field:
        :return:
        '''
        logging.info("es_url:" + es_url)
        new_index_array = []
        business_codes = business_codes_rel_type_dict.keys()
    
        for business_code in business_codes:
            relation_types = business_codes_rel_type_dict.get(business_code)
            for relation_type in relation_types:
                index_name = es_index_prefix + business_code + "_" + relation_type
                es_index_url = es_url + index_name
                logging.info("es_index_url: %s" % (es_index_url))
                r = requests.put(es_index_url, json.dumps(es_mapping), headers={"content-type": "application/json"})
                response_json = json.loads(r.text)
    
                if 200 == r.status_code:
                    if response_json['acknowledged']:
                        logging.info("index_name: %s 创建成功" % (index_name))
                        new_index_array.append(index_name)
                        if do_reindex:
                            result = es_reindex_by_rel_type(source_index, index_name, relation_type, routing_field)
                            if 0 != result:
                                logging.info("do new_index_array:%s over" % (new_index_array))
                                return -1
                else:
                    logging.info("index_name: %s 创建失败" % (index_name))
                    logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
                    logging.info("do new_index_array:%s over" % (new_index_array))
                    return -1
        logging.info("index create done!")
        logging.info("new_index_array:%s" % (new_index_array))
        return 0
    
    
    # 备份索引内容(结构+数据)
    def es_reindex_by_rel_type(source_index, bak_index, rel_type, routing_field):
        # 备份数据
        url = es_url + "_reindex"
        data = {
            "source": {
                "index": source_index,
                "query": {
                    "term": {
                        "rel_type": rel_type.swapcase()
                    }
                }
            },
            "dest": {
                "index": bak_index
    
            },
            "script": {
                "inline": "ctx._routing = ctx._source." + routing_field,
                "lang": "painless"
            }
    
        }
        r = requests.post(url, json.dumps(data), headers={"content-type": "application/json"})
        if 200 == r.status_code:
            response_json = json.loads(r.text)
            logging.info("备份数据返回响应: %s " % r.text.encode(encoding="utf-8"))
            logging.info("备份索引: %s , 备份数据量: %d" % (bak_index, response_json['total']))
        else:
            logging.info("source_index:%s bak_index: %s 备份失败" % (source_index, bak_index))
            logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
            return -1
        return 0
    
    
    # 分页查询
    def get_es_data_by_scroll(es_url, index, query={}, scroll_id=None, scroll="5m", batch_size=10000):
        data = []
        try:
            while True:
                if not scroll_id:
                    # 每次取的数据量
                    query["size"] = batch_size
                    curl_url = es_url + index + '/_search?scroll=' + scroll
                    logging.info("curl_url:%s" % (curl_url))
                    response = requests.post(curl_url, json.dumps(query), headers={'content-type': 'application/json'})
                else:
                    curl_url = es_url + '_search/scroll?scroll=' + scroll + '&scroll_id=' + scroll_id
                    logging.info("curl_url:%s" % (curl_url))
                    response = requests.get(curl_url)
                # 结果返回处理
                if response:
                    if 200 == response.status_code:
                        response_json = json.loads(response.text)
                        scroll_id = response_json['_scroll_id']
                        # Update the scroll ID
                        if scroll_id is None:
                            break
                        # Get the number of results that we returned in the last scroll
                        if not response_json['hits']['hits']:
                            break
                        response_data = [doc["_source"] for doc in response_json['hits']['hits']]
                        data.extend(response_data)
                    else:
                        logging.info("curl_url:%s 查询失败" % (curl_url))
                        logging.error("错误信息: %s" % response.text.encode(encoding="utf-8"))
                        return None
                else:
                    logging.info("curl_url:%s 查询失败" % (curl_url))
                    return None
            logging.info("get data size:%s" % (len(data)))
            return data
        except Exception as e:
            logging.error(e)
            logging.error("query fail!")
            print("exception!")
            return None
    
    
    # 查询
    def query(es_url, index, query, batch_size=1000, scroll="3m"):
        try:
            curl_url = es_url + index_name + '/_search?scroll=' + scroll
            # 每次取的数据量
            query["size"] = batch_size
            response = requests.post(curl_url, json.dumps(query), headers={'content-type': 'application/json'})
            if response:
                response_json = json.loads(response.text)
                return response_json
        except Exception as e:
            logging.error(e)
            logging.error("query fail!")
            print("exception!")
    
    
    # 分页查询
    def query_by_scroll(es_url, index, doc_type=None, query=None, scroll='5m', batch_size=1000):
        '''
        :param index:
        :param doc_type:
        :param query:
        queryParam = {
            "query": {
                "bool": {
                    "must": [
                        {
                            "range": {
                                "import_es_time": {
                                    "lt": "2019-07-31 00:00:00"
                                }
                            }
                        },
                        {
                            "term": {
                                "list_type": "01"
                            }
                        },
                        {
                            "term": {
                                "delete_status": "0"
                            }
                        }
                    ],
                    "must_not": [],
                    "should": []
                }
            }
        }
        :param scroll:
        :param batch_size:
        :return:
        '''
        try:
            logging.info("query: index:%s doc_type:%s scroll:%s batch_size:%s query:%s" % (
                index, doc_type, scroll, batch_size, query))
            # 每次取的数据量
            query["size"] = batch_size
            if doc_type:
                curl_url = es_url + index + '/' + doc_type + '/_search?scroll=' + scroll
            else:
                curl_url = es_url + index + '/_search?scroll=' + scroll
    
            response = requests.post(curl_url, json.dumps(query), headers={'content-type': 'application/json'})
    
            if response:
                if 200 == response.status_code:
                    response_json = json.loads(response.text)
                    return response_json
                else:
                    logging.info("curl_url:%s query: %s 失败" % (curl_url, query))
                    logging.error("错误信息: %s" % response.text.encode(encoding="utf-8"))
                    return None
        except Exception as e:
            logging.error(e)
            logging.error("query fail!")
            print("query_by_scroll exception!")
            return None
    
    
    # 分页scroll_id查询
    def query_by_scroll_id(es_url, index, scroll_id, scroll='5m'):
        if scroll_id is None:
            return
        try:
            curl_url = es_url + '_search/scroll?scroll=' + scroll + '&scroll_id=' + scroll_id
            logging.info("curl_url:%s" % (curl_url))
            response = requests.get(curl_url)
            # 结果返回处理
            if response:
                if 200 == response.status_code:
                    response_json = json.loads(response.text)
                    return response_json
                else:
                    logging.info("curl_url:%s 查询失败" % (curl_url))
                    logging.error("错误信息: %s" % response.text.encode(encoding="utf-8"))
                    return None
            else:
                logging.info("curl_url:%s 查询失败" % (curl_url))
                return None
        except Exception as e:
            logging.error(e)
            logging.error("query fail! scroll_id:%s" % (scroll_id))
            print("query_by_scroll_id exception!")
            return None
    
    
    # 分页获取解析数据
    def get_and_parse_query_scroll_data(es_url, index, doc_type=None, query=None, scroll='5m', batch_size=1000,
                                        convert_dict={}, add_date_time=False):
        page = query_by_scroll(es_url, index, doc_type=doc_type, query=query, scroll=scroll)
        return convert_es_page_data(page, convert_dict, add_date_time)
    
    
    # 解析返回数据
    def parse_es_page_data(page):
        result_data = {}
        if not page or not page['_scroll_id']:
            logging.warning("query_by_scroll return none")
            print("query_by_scroll return none")
            return result_data
        if page['_scroll_id']:
            scroll_id = page['_scroll_id']
            result_data[PAGE_RESULT_SCROLL_ID] = scroll_id
            print("Scrolling scroll_id:%s" % (scroll_id))
        if page['hits']:
            total_size = page['hits']['total']
            print("Scrolling total_size:%s" % (total_size))
            result_data[PAGE_RESULT_TOTAL_SIZE] = total_size
            hits = page['hits']['hits']
            scroll_size = len(hits)
            result_data[PAGE_RESULT_SCROLL_SIZE] = scroll_size
            result_data[PAGE_RESULT_HITS] = hits
        return result_data
    
    
    # 根据业务需要转换数据
    def convert_es_page_data(page, convert_dict={}, add_date_time=False):
        '''
        :param page:
        :param convert_dict:
        convert_dict 示例
        {"key1": {"dest_key": ["key1","key2"], "default_value":""}}
        :param add_date_time:
        :return:
        '''
        result_data = parse_es_page_data(page)
        if result_data and result_data['hits']:
            result = result_data['hits']
        # parse data
        convert_data = []
        for item in result:
            if item['_source']:
                source_data = item['_source']
                convert_result = {}
                keys = convert_dict.keys()
                for source_key in keys:
                    dest_dict = convert_dict.get(source_key, [])
                    dst_keys = dest_dict.get(CONVERT_DEST_KEY, [])
                    default_value = dest_dict.get(CONVERT_DEFAULT_VALUE, '')
                    for dst_key in dst_keys:
                        convert_result[dst_key] = source_data.get(source_key, default_value)
                if add_date_time:
                    date_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
                    convert_result["date_time"] = date_time
                convert_str = json.dumps(convert_result, ensure_ascii=False)
                convert_data.append(convert_str.encode('utf-8'))
        result_data[PAGE_RESULT_CONVERT_DATA] = convert_data
        return result_data
    
    
    def main():
        # ES服务器地址
        #  ##开发环境
        es_url = "http://192.168.3.63:9200/"
    
        #  ##测试环境
        # es_url = "http://192.168.3.206:9200/"
        #  ##预发环境
        # es_url = "http://100.1.1.1:9200/"
        #  ##线上环境
        # es_url = "http://10.1.1.1:9200/"
    
        BUCKET_MUM = 2
        INDEX_NAME = 'zyc_test'
        DOC_TYPE = 'relation'
        BAK_INDEX_NAME = 'backup' + INDEX_NAME
        INDEX_PREFIX = INDEX_NAME + '_'
        BAK_INDEX_PREFIX = BAK_INDEX_NAME + '_'
        ES_MAPPING = {
            "settings": {
                "index": {
                    "search": {
                        "slowlog": {
                            "threshold": {
                                "fetch": {
                                    "warn": "100ms"
                                },
                                "query": {
                                    "warn": "100ms"
                                }
                            }
                        }
                    },
                    "refresh_interval": "1s",
                    "indexing": {
                        "slowlog": {
                            "threshold": {
                                "index": {
                                    "warn": "1s"
                                }
                            }
                        }
                    },
                    "number_of_shards": "6",
                    "translog": {
                        "flush_threshold_size": "1gb",
                        "sync_interval": "120s",
                        "durability": "async"
                    }
                }
            },
            "aliases": {
                "vii_relation": {
    
                }
            },
            "mappings": {
                "relation": {
                    "dynamic_date_formats": [
                        "yyyy-MM-dd HH:mm:ss",
                        "yyyy-MM-dd"
                    ],
                    "dynamic_templates": [
                        {
                            "date_template": {
                                "match_pattern": "regex",
                                "mapping": {
                                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis",
                                    "type": "date"
                                },
                                "match_mapping_type": "string",
                                "match": "(.*_date|.*_timestamp|.*_ts|.*_time)"
                            }
                        },
                        {
                            "keyword_template": {
                                "mapping": {
                                    "type": "keyword"
                                },
                                "match_mapping_type": "string"
                            }
                        }
                    ],
                    "_all": {
                        "enabled": "false"
                    },
                    "properties": {
                        "pk": {
                            "type": "keyword"
                        },
                        "in_obj_type": {
                            "type": "keyword"
                        },
                        "in_obj_value": {
                            "type": "keyword"
                        },
                        "in_obj_type_value": {
                            "type": "keyword"
                        },
                        "in_obj_tag": {
                            "type": "keyword"
                        },
                        "in_obj_tag_desc": {
                            "type": "keyword"
                        },
                        "out_obj_type": {
                            "type": "keyword"
                        },
                        "out_obj_value": {
                            "type": "keyword"
                        },
                        "out_obj_type_value": {
                            "type": "keyword"
                        },
                        "out_obj_tag": {
                            "type": "keyword"
                        },
                        "out_obj_tag_desc": {
                            "type": "keyword"
                        },
                        "start_time": {
                            "type": "date",
                            "format": "yyyy-MM-dd HH:mm:ss"
                        },
                        "end_time": {
                            "type": "date",
                            "format": "yyyy-MM-dd HH:mm:ss"
                        },
                        "rel_type": {
                            "type": "keyword"
                        },
                        "rel_detail": {
                            "type": "keyword",
                            "index": "false"
                        },
                        "count": {
                            "type": "long"
                        },
                        "similarity": {
                            "type": "double"
                        },
                        "tag_codes": {
                            "type": "keyword"
                        },
                        "delete_status": {
                            "type": "integer"
                        },
                        "tenant_code": {
                            "type": "keyword"
                        },
                        "business_code": {
                            "type": "keyword"
                        },
                        "import_es_time": {
                            "type": "date",
                            "format": "yyyy-MM-dd HH:mm:ss"
                        }
    
                    }
                }
            }
        }
    
        ADD_ES_MAPPING = {
            "properties": {
                "delete_status": {
                    "type": "integer"
                },
                "start_time": {
                    "format": "yyyy-MM-dd HH:mm:ss",
                    "type": "date"
                },
                "end_time": {
                    "format": "yyyy-MM-dd HH:mm:ss",
                    "type": "date"
                }
            }
        }
        queryParam = {
            "query": {
                "bool": {
                    "must": [
                    ],
                    "must_not": [],
                    "should": []
                }
            }
        }
        QUERY_INDEX_NAME = 'cysaas_object_basic_info'
        QUERY_DOC_TYPE = 'basic'
        logging.info("begin...")
        create_index(es_url, INDEX_NAME, ES_MAPPING)
        time.sleep(5)
        delete_index(es_url, INDEX_NAME)
        time.sleep(5)
        batch_create_index_with_mapping(es_url, INDEX_PREFIX, ES_MAPPING)
        time.sleep(5)
        add_properties_to_index(es_url, INDEX_NAME, DOC_TYPE, ADD_ES_MAPPING)
        time.sleep(5)
        batch_add_properties_to_index(es_url, INDEX_PREFIX, DOC_TYPE, ADD_ES_MAPPING, 2)
        time.sleep(5)
        result = query_by_scroll(es_url, index=QUERY_INDEX_NAME, query=queryParam)
        convert_dict = {"obj_value": {"dest_key": ["obj_value", "value"], "default_value": ""}}
        result_data = get_and_parse_query_scroll_data(es_url, index=QUERY_INDEX_NAME, query=queryParam,
                                                      convert_dict=convert_dict)
        get_data = get_es_data_by_scroll(es_url, index=QUERY_INDEX_NAME, query=queryParam)
        del_index_by_prefix(es_url, INDEX_NAME)
        time.sleep(5)
        batch_es_del_index(es_url, INDEX_PREFIX, 2)
        logging.info("done")
        print("done")
    
    
    if __name__ == '__main__':
        main()
    

     二、ES Python脚本查询操作 client方式

    #!coding:utf-8
    
    import json
    import logging
    import time
    
    from elasticsearch import Elasticsearch, helpers
    
    PAGE_RESULT_SCROLL_ID = 'scroll_id'
    PAGE_RESULT_SCROLL_SIZE = 'scroll_size'
    PAGE_RESULT_TOTAL_SIZE = 'total_size'
    PAGE_RESULT_HITS = 'hits'
    PAGE_RESULT_DATA = 'data'
    PAGE_RESULT_CONVERT_DATA = 'convert_data'
    
    CONVERT_DEST_KEY = 'dest_key'
    CONVERT_DEFAULT_VALUE = 'default_value'
    
    current_time = time.strftime("%Y-%m-%d-%H-%M", time.localtime(time.time()))
    # 日志设置
    log_file = "operate_es_client_" + current_time + ".log"
    logging.FileHandler(filename=log_file, encoding='utf-8')
    logging.basicConfig(filename=log_file, level=logging.INFO)
    
    
    def query(es_client, index, query):
        try:
            return helpers.scan(es_client, index=index, scroll="3m", query=query)
        except Exception as e:
            logging.error(e)
            logging.error("query fail!")
            print("exception!")
    
    
    def query_by_scroll(es_client, index, doc_type=None, query=None, scroll='5m', batch_size=1000, preserve_order=False,
                        **kwargs):
        '''
        :param index:
        :param doc_type:
        :param query:
        queryParam = {
            "query": {
                "bool": {
                    "must": [
                        {
                            "range": {
                                "import_es_time": {
                                    "lt": "2019-07-31 00:00:00"
                                }
                            }
                        },
                        {
                            "term": {
                                "list_type": "01"
                            }
                        },
                        {
                            "term": {
                                "delete_status": "0"
                            }
                        }
                    ],
                    "must_not": [],
                    "should": []
                }
            }
        }
        :param scroll:
        :param batch_size:
        :param preserve_order:
        :param kwargs:
        :return:
        '''
        if not preserve_order:  # 是否需要scan模式
            kwargs['search_type'] = 'query_then_fetch'
        try:
            logging.info("query: index:%s doc_type:%s scroll:%s batch_size:%s query:%s" % (
                index, doc_type, scroll, batch_size, query))
            resp = es_client.search(index=index,
                                    doc_type=doc_type,
                                    scroll=scroll,
                                    size=batch_size,
                                    body=query,
                                    **kwargs)
            return resp
        except Exception as e:
            logging.error(e)
            logging.error("query fail!")
            print("exception!")
        return None
    
    
    def query_by_scroll_id(es_client, scroll_id, scroll='5m'):
        if scroll_id is None:
            return
        try:
            resp = es_client.scroll(scroll_id, scroll=scroll)
            return resp
        except Exception as e:
            logging.error(e)
            logging.error("query fail! scroll_id:%s" % (scroll_id))
            print("exception!")
        return None
    
    
    def get_and_parse_query_scroll_data(es_client, index, doc_type=None, query=None, scroll='5m', batch_size=1000,
                                        convert_dict={}, add_date_time=False):
        page = query_by_scroll(es_client, index, doc_type=doc_type, query=query, scroll=scroll)
        return convert_es_page_data(page, convert_dict, add_date_time)
    
    
    def parse_es_page_data(page):
        result_data = {}
        if not page or not page['_scroll_id']:
            logging.warning("query_by_scroll return none")
            print("query_by_scroll return none")
            return result_data
        if page['_scroll_id']:
            scroll_id = page['_scroll_id']
            result_data[PAGE_RESULT_SCROLL_ID] = scroll_id
            print("Scrolling scroll_id:%s" % (scroll_id))
        if page['hits']:
            total_size = page['hits']['total']
            print("Scrolling total_size:%s" % (total_size))
            result_data[PAGE_RESULT_TOTAL_SIZE] = total_size
            hits = page['hits']['hits']
            scroll_size = len(hits)
            result_data[PAGE_RESULT_SCROLL_SIZE] = scroll_size
            result_data[PAGE_RESULT_HITS] = hits
        return result_data
    
    
    def convert_es_page_data(page, convert_dict={}, add_date_time=False):
        '''
        :param page:
        :param convert_dict:
        convert_dict 示例
        {"key1": {"dest_key": ["key1","key2"], "default_value":""}}
        :param add_date_time:
        :return:
        '''
        result_data = parse_es_page_data(page)
        result = []
        if result_data and result_data['hits']:
            result = result_data['hits']
        # parse data
        convert_data = []
        for item in result:
            if item['_source']:
                source_data = item['_source']
                convert_result = {}
                keys = convert_dict.keys()
                for source_key in keys:
                    dest_dict = convert_dict.get(source_key, [])
                    dst_keys = dest_dict.get(CONVERT_DEST_KEY, [])
                    default_value = dest_dict.get(CONVERT_DEFAULT_VALUE, '')
                    for dst_key in dst_keys:
                        convert_result[dst_key] = source_data.get(source_key, default_value)
                if add_date_time:
                    date_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
                    convert_result["date_time"] = date_time
                convert_str = json.dumps(convert_result, ensure_ascii=False)
                convert_data.append(convert_str.encode('utf-8'))
        result_data[PAGE_RESULT_CONVERT_DATA] = convert_data
        return result_data
    
    
    def main():
        # ES服务器地址
        #  ##开发环境
        es_host = "192.168.3.63"
        es_url = "http://192.168.3.63:9200/"
    
        INDEX_NAME = 'cysaas_object_basic_info'
        DOC_TYPE = 'basic'
    
        queryParam = {
            "query": {
                "bool": {
                    "must": [],
                    "must_not": [],
                    "should": []
                }
            }
        }
        logging.info("begin...")
        # es_client = Elasticsearch([{'host': es_host, 'port': '9200'}])
        es_client = Elasticsearch([es_url], verify_certs=False)
        result = query_by_scroll(es_client, index=INDEX_NAME, doc_type=DOC_TYPE, query=queryParam)
        time.sleep(5)
        result_data = get_and_parse_query_scroll_data(es_client, index=INDEX_NAME, doc_type=DOC_TYPE,
                                                      query=queryParam)
        logging.info("done")
        print("done")
    
    
    if __name__ == '__main__':
        main()
    

    scan能避免scroll的排序性能消耗,from size分页查询模式会对数据集进行整体排序, 性能损耗是很大的. 如果我们关闭排序,那么可以消耗极少资源返回所有的文档。scan就是不去,而是仅仅从每个有结果的分片中返回数据.

    下面是python elasticsearch helpers.scan的源码。对照elasticsearch scroll scan基本用法,很容易就能理解下面的代码。elasticsearch-py把高性能的功能都继承在了helpers模块里,比如helpers.scan helpers.reindex streaming_bulk helpers.bulk  parallel_bulk .  

    elasticsearch.helpers.scan(client, query=None, scroll=u'5m', raise_on_error=True, preserve_order=False, **kwargs)
    
    
    
    参数介绍:
    
    client – elasticsearch的连接对象
    
    query – elasticsearch dsl查询语句
    
    scroll – 你想让scroll的结果集在server端标记多久
    
    raise_on_error – raise的error class
    
    preserve_order – 这里其实对应的是search_type,是否要求排序

    file: helpers/__init__.py

    #默认是5m分钟, 默认是search_type是scan扫描模式
    
    def scan(client, query=None, scroll='5m', preserve_order=False, **kwargs):
    
    
    
        if not preserve_order:   #是否需要scan模式
    
            kwargs['search_type'] = 'scan'
    
        resp = client.search(body=query, scroll=scroll, **kwargs)
    
    
    
        scroll_id = resp.get('_scroll_id')  #第一次查询拿到_scroll_id token
    
        if scroll_id is None:
    
            return
    
    
    
        first_run = True
    
        while True:
    
            #如果你server_type不是scan,那么第一次的结果里是包含数据的。
    
            if preserve_order and first_run:
    
                first_run = False
    
            else:
    
                resp = client.scroll(scroll_id, scroll=scroll)
    
            if not resp['hits']['hits']:
    
                break
    
            for hit in resp['hits']['hits']:
    
                yield hit    #通过yield生成器来返回数据
    
            scroll_id = resp.get('_scroll_id')
    
            if scroll_id is None:
    
                break

    file: client/__init__.py

    @query_params('scroll')
    
    def scroll(self, scroll_id, params=None):
    
        # 第二次scroll的数据请求是直接 /_search/scroll,方法用的是GET
    
        _, data = self.transport.perform_request('GET', '/_search/scroll',params=params, body=scroll_id)
    
        return data

    对于elasticsearch scanscroll的使用方法, 大家注意一下异常情况. 

    data = scan(es,
    
        query={"query": {"match": {"domain": "xiaorui.cc"}}},
    
        index="xiaorui_index",
    
        doc_type="blog"
    
    )
    
    for one in data:
    
        print one

    Elasticsearch博大精深… …  经过我的线下线上测试,使用scroll scan的性能还是不错的,返回的速度不错,明显比那种from size分页要快速,而且节省了elasticsearch的检索资源。

  • 相关阅读:
    EBS值集定义
    EBS MOAC 多OU使用配置
    EBS 根据报表名称查询对应职责
    EBS 根据Form名称查询对应职责
    Oracle EBS中弹性域推荐文档
    EBS FORM 中DELETE_RECORD的用法
    Java —— 对象
    Java——语句
    Java中类的继承
    Java 中声明和语句
  • 原文地址:https://www.cnblogs.com/candlia/p/11919882.html
Copyright © 2011-2022 走看看