zoukankan      html  css  js  c++  java
  • 使用Python对ElasticSearch获取数据及操作

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    """
        @Time    : 2018/7/4
        @Author  : LiuXueWen
        @Site    : 
        @File    : ElasticSearchOperation.py
        @Software: PyCharm
        @Description: 对elasticsearch数据的操作,包括获取数据,发送数据
    """
    import elasticsearch
    import json
    
    import Util_Ini_Operation
    
    class elasticsearch_data():
        def __init__(self,hosts,username,password,maxsize,is_ssl):
            # 初始化ini操作脚本,获取配置文件
            try:
                # 判断请求方式是否ssl加密
                if is_ssl == "true":
                    # 获取证书地址
                    cert_pem = Util_Ini_Operation.get_ini("config.ini").get_key_value("certs","certs")
                    es_ssl = elasticsearch.Elasticsearch(
                        # 地址
                        hosts=hosts,
                        # 用户名密码
                        http_auth=(username,password),
                        # 开启ssl
                        use_ssl=True,
                        # 确认有加密证书
                        verify_certs=True,
                        # 对应的加密证书地址
                        client_cert=cert_pem
                    )
                    self.es = es_ssl
                elif is_ssl == "false":
                    # 创建普通类型的ES客户端
                    es_ordinary = elasticsearch.Elasticsearch(hosts, http_auth=(username, password), maxsize=int(maxsize))
                    self.es = es_ordinary
            except Exception as e:
                print(e)
    
    
        def query_data(self,keywords_list,date):
            gte = "now-"+str(date)
            query_data = {
                # 查询语句
                "query": {
                    "bool": {
                        "must": [
                            {
                                "query_string": {
                                    "query": keywords_list,
                                    "analyze_wildcard": True
                                }
                            },
                            {
                                "range": {
                                    "@timestamp": {
                                        "gte": gte,
                                        "lte": "now",
                                        "format": "epoch_millis"
                                    }
                                }
                            }
                        ],
                        "must_not": []
                    }
                }
            }
            return query_data
    
        # 从es获取数据
        def get_datas_by_query(self,index_name,keywords,param,date):
            '''
            :param index_name: 索引名称
            :param keywords: 关键字词,数组
            :param param: 需要数据条件,例如_source
            :param date: 过去时间范围,字符串格式,例如过去30分钟内数据,"30m"
            :return: all_datas 返回查询到的所有数据(已经过param过滤)
            '''
    
            all_datas = []
            # 遍历所有的查询条件
            for keywords_list in keywords:
                # DSL语句
                query_data = self.query_data(keywords_list,date)
                res = self.es.search(
                    index=index_name,
                    body=query_data
                )
                for hit in res['hits']['hits']:
                    # 获取指定的内容
                    response = hit[param]
                    # 添加所有数据到数据集中
                    all_datas.append(response)
            # 返回所有数据内容
            return all_datas
    
        # 当索引不存在创建索引
        def create_index(self,index_name):
            '''
            :param index_name: 索引名称
            :return:如果创建成功返回创建结果信息,试过已经存在创建新的index失败返回index的名称
            '''
            # 获取索引的映射
            # index_mapping = IndexMapping.index_mapping
            # # 判断索引是否存在
            # if self.es.indices.exists(index=index_name) is not True:
            #     # 创建索引
            #     res = self.es.indices.create(index=index_name,body=index_mapping)
            #     # 返回结果
            #     return res
            # else:
            #     # 返回索引名称
            #     return index_name
            pass
    
        # 插入指定的单条数据内容
        def insert_single_data(self,index_name,doc_type,data):
            '''
            :param index_name: 索引名称
            :param doc_type: 文档类型
            :param data: 需要插入的数据内容
            :return: 执行结果
            '''
            res = self.es.index(index=index_name,doc_type=doc_type,body=data)
            return res
    
        # 向ES中新增数据,批量插入
        def insert_datas(self,index_name):
            '''
            :desc 通过读取指定的文件内容获取需要插入的数据集
            :param index_name: 索引名称
            :return: 插入成功的数据条数
            '''
            insert_datas = []
            # 判断插入数据的索引是否存在
            self.createIndex(index_name=index_name)
            # 获取插入数据的文件地址
            data_file_path = self.ini.get_key_value("datafile","datafilepath")
            # 获取需要插入的数据集
            with open(data_file_path,"r+") as data_file:
                # 获取文件所有数据
                data_lines = data_file.readlines()
                for data_line in data_lines:
                    # string to json
                    data_line = json.loads(data_line)
                    insert_datas.append(data_line)
            # 批量处理
            res = self.es.bulk(index=index_name,body=insert_datas,raise_on_error=True)
            return res
    
        # 从ES中在指定的索引中删除指定数据(根据id判断)
        def delete_data_by_id(self,index_name,doc_type,id):
            '''
            :param index_name: 索引名称
            :param index_type: 文档类型
            :param id: 唯一标识id
            :return: 删除结果信息
            '''
            res = self.es.delete(index=index_name,doc_type=doc_type,id=id)
            return res
    
        # 根据条件删除数据
        def delete_data_by_query(self,index_name,doc_type,param,gt_time,lt_time):
            '''
            :param index_name:索引名称,为空查询所有索引
            :param doc_type:文档类型,为空查询所有文档类型
            :param param:过滤条件值
            :param gt_time:时间范围,大于该时间
            :param lt_time:时间范围,小于该时间
            :return:执行条件删除后的结果信息
            '''
            # DSL语句
            query_data = {
                # 查询语句
                "query": {
                    "bool": {
                        "must": [
                            {
                                "query_string": {
                                    "query": param,
                                    "analyze_wildcard": True
                                }
                            },
                            {
                                "range": {
                                    "@timestamp": {
                                        "gte": gt_time,
                                        "lte": lt_time,
                                        "format": "epoch_millis"
                                    }
                                }
                            }
                        ],
                        "must_not": []
                    }
                }
            }
            res = self.es.delete_by_query(index=index_name,doc_type=doc_type,body=query_data,_source=True)
            return res
    
        # 指定index中删除指定时间段内的全部数据
        def delete_all_datas(self,index_name,doc_type,gt_time,lt_time):
            '''
            :param index_name:索引名称,为空查询所有索引
            :param doc_type:文档类型,为空查询所有文档类型
            :param gt_time:时间范围,大于该时间
            :param lt_time:时间范围,小于该时间
            :return:执行条件删除后的结果信息
            '''
            # DSL语句
            query_data = {
                # 查询语句
                "query": {
                    "bool": {
                        "must": [
                            {
                                "match_all": {}
                            },
                            {
                                "range": {
                                    "@timestamp": {
                                        "gte": gt_time,
                                        "lte": lt_time,
                                        "format": "epoch_millis"
                                    }
                                }
                            }
                        ],
                        "must_not": []
                    }
                }
            }
            res = self.es.delete_by_query(index=index_name, doc_type=doc_type, body=query_data, _source=True)
            return res
    
        # 修改ES中指定的数据
        def update_data_by_id(self,index_name,doc_type,id,data):
            '''
            :param index_name: 索引名称
            :param doc_type: 文档类型,为空表示所有类型
            :param id: 文档唯一标识编号
            :param data: 更新的数据
            :return: 更新结果信息
            '''
            res = self.es.update(index=index_name,doc_type=doc_type,id=id,body=data)
            return res

    转载自https://blog.csdn.net/carolcoral/article/details/80984712

  • 相关阅读:
    NPOI 菜鸟实践行之根据指定的模板生成Excel 2003格式的文件 (一)
    python循环字典
    对字典中找出最大的值
    通用解决方案:解决NHibernate SELECT 多表查询结果List绑定控件显示问题。
    读取Excel。。。
    哈,申请成功了。
    SharePoint 使用命令行备份过程中中断导致站点没有权限更改问题
    反射
    SharePoint 自定义主机标头与本地计算机名称不匹配,验证失败
    产生一个int数组,长度为100,并向其中随机插入1100,并且不能重复。自己写的算法
  • 原文地址:https://www.cnblogs.com/double-orange/p/10075860.html
Copyright © 2011-2022 走看看