zoukankan      html  css  js  c++  java
  • python连接ES进行数据过滤删除,新增查询、创建索引功能

    # -*- coding: utf-8 -*-
    """
    @Time :2020/06/04
    @UpTime :2020/06/23
    @Author :Mr.Yang
    @File :ElasticSearch_operations.py
    @Software :PyCharm
    @Description :对ES进行查询,创建索引、按时间戳删除操作(增加写入操作待定)
    @plug-in package:目前我所使用的是7.7.1的插件包,pip install elasticsearch==7.7.1
    """

    import time
    import datetime
    import json
    import sys
    from elasticsearch import Elasticsearch

    """当前时间及时间戳转换"""
    Time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
    """当前时间计算"""
    now_time = datetime.date.today()
    """当前时间的前一个月"""
    last_time = datetime.datetime.now() - datetime.timedelta(days = 1)
    """字符格式类型转换"""
    now_time = last_time.strftime("%Y-%m-%d %H:%M:%S")
    """换算时间戳为毫秒级"""
    mtime = (int(time.mktime(last_time.timetuple())) * 1000)

    """ES连接配置"""
    es_host = ['', '', '']
    es = Elasticsearch(hosts=es_host)

    """ES查询、删除修改下列参数"""
    index_name = ""
    id = ""

    def query(index_name,id):
    """查询转换为json格式"""
    res = es.get(index=index_name,id=id)
    print json.dumps(res, sort_keys=True, indent=4, separators=(', ', ': '))#['hits']['hits']

    def deles():
    """查询所有数据"""
    noll = {
    "query": {
    "bool": {
    "must_not": {
    "exists": {
    "field": "updateTime"
    }
    }
    }
    }
    }
    """小于更新时间戳"""
    updatetime = {
    "query":{
    "range":{
    "updateTime": {
    "lte": mtime
    }
    }
    }
    }
    """小于创建时间戳"""
    createtime = {
    "query":{
    "range":{
    "createTime":{
    "lte":mtime
    }
    }
    }
    }

    #updata = es.search(index=index_name,body=updatetime)
    #create = es.search(index=index_name,body=createtime)
    """查询updateTime是否为空"""
    updatanoll = es.search(index=index_name,body=noll)
    """删除更新时间小于设定时间戳的数据"""
    es.delete_by_query(index=index_name, body=updatetime)
    """查询结果转换为json格式"""
    #res = json.dumps(updata, sort_keys=True, indent=4, separators=(', ', ': ')
    """查询结果匹配到单个值"""
    #res = json.dumps(updatenot["hits"]["hits"][0]["_source"]["updateTime"], sort_keys=True, indent=4, separators=(', ', ': '))
    """删除更新时间为空的数据,以插入时间做判断"""
    if updatanoll != " ":
    es.delete_by_query(index=index_name,body=createtime)
    else:
    print "无更新时间为空的数据"

    def create():
    """创建索引"""
    body = {
    "settings":{
    "index":{
    "number_of_shards": "3",
    "number_of_replicas": "2"
    }
    }
    }
    try:
    result = es.indices.create(index='ceshi',body=body)
    print result
    except Exception as err:
    print err


    choose = input("请输出选项1/2/3(1查询,2过滤以更新时间为1个月以前的删除,3创建索引):")
    if choose == 1:
    query(index_name, id)
    elif choose == 2:
    deles()
    elif choose == 3:
    create()
    else:
    print "请正确输入参数(1/2/3(1查询,2过滤以更新时间为1个月以前的删除,3创建索引)"
  • 相关阅读:
    第七章 下
    第七章 上
    第六章 下
    第六章 上
    第五章 下
    linux 系统通过安装包安装mysql 8 步骤(非MariaDB)
    热烈庆祝博客换了新皮肤
    异常处理
    栈计算逆波兰表达式
    栈计算中缀表达式
  • 原文地址:https://www.cnblogs.com/Huang-Niu/p/13049746.html
Copyright © 2011-2022 走看看