Es基础数据类型
string 字符串类型,es中最常用的类型,官方文档 比较重要的参数: index分析 analyzed(默认) not_analyzed no store存储 true 独立存储 false(默认)不存储,从_source中解析 Numeric 数值类型,注意numeric并不是一个类型,它包括多种类型,比如:long,integer,short,byte,double,float,每种的存储空间都是不一样的,一般默认推荐integer和float。官方文档参考 重要的参数: index分析 not_analyzed(默认) ,设置为该值可以保证该字段能通过检索查询到 no store存储 true 独立存储 false(默认)不存储,从_source中解析 date 日期类型,该类型可以接受一些常见的日期表达方式,官方文档参考。 重要的参数: index分析 not_analyzed(默认) ,设置为该值可以保证该字段能通过检索查询到 no store存储 true 独立存储 false(默认)不存储,从_source中解析 format格式化 strict_date_optional_time||epoch_millis(默认) 你也可以自定义格式化内容,比如 "date": { "type": "date", "format": "yyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" } 更多的时间表达式可以参考这里 IP 这个类型可以用来标识IPV4的地址,参考官方文档 常用参数: index分析 not_analyzed(默认) ,设置为该值可以保证该字段能通过检索查询到 no store存储 true 独立存储 false(默认)不存储,从_source中解析 boolean 布尔类型,所有的类型都可以标识布尔类型,参考官方文档 False: 表示该值的有:false, "false", "off", "no", "0", "" (empty string), 0, 0.0 True: 所有非False的都是true 重要的参数: index分析 not_analyzed(默认) ,设置为该值可以保证该字段能通过检索查询到 no store存储 true 独立存储 false(默认)不存储,从_source中解析
Es的head插件使用
1.查看集群中的索引信息和查看索引中定义的列的类型
2.查看es是如何对某个字符串进行切分的
Es对Date类型的处理
es如何存储Date类型,它最终的输出方式都是以字符串输出,只是默认的格式是:1970-01-01T00:00:00Z
,也就是默认的 UTC
格式
所以我们在查询和添加时间类型数据的时候需要把格式手动转换成UTC格式 否则会出现时间格式转换异常
异常查询示例:
正常查询示例:
Es查询条件设置
body = { "query": { "filtered": { "filter": { "bool":{ "must": [ //must数组中的三个条件必须同时满足 {"term":{"jylsh": jylsh}}, { "range": { "@timestamp": { "gte": tools.strtime_to_timestamp(startTime), "lte": tools.strtime_to_timestamp(endTime) } } }, { "bool": { "should": [ //should数组中的条件至少需要满足一个 {"match": {"message": "SERVICE_LOG"}}, {"match": {"message": "ENTRANCE_LOG"}} ] } } ] } } } }, "sort": sort_dict }
Es查询代码示例
1.需要把时间转换成unix格式的13位数的时间戳字符 精确到毫秒级
#格式化日期字符串 def formatDate(self,timestr): timeStruct = time.strptime(timestr, "%Y-%m-%d %H:%M:%S.%f") strTime = time.strftime("%Y-%m-%d %H:%M:%S", timeStruct) return strTime import time t = "2017-11-24 17:30:00" #将其转换为时间数组 timeStruct = time.strptime(t, "%Y-%m-%d %H:%M:%S") #转换为时间戳: timeStamp = int(time.mktime(timeStruct)) print(timeStamp)
#!/usr/bin/env python # -*- coding: utf-8 -*- from elasticsearch import Elasticsearch import time,datetime ####动态修改配置项目############ es = Elasticsearch("http://127.0.0.1:9200") appname="dzgzpt-wsys" userChoiceTime_start="2019-04-05 09:27:27.820" userChoiceTime_end="2019-06-27 09:27:41.986" #东八区时间 nowtime=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] #计算15分钟前的时间 fifteenminAgo=(datetime.datetime.now()+datetime.timedelta(minutes=-15)).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] #计算1个小时前的时间 hourAgo=(datetime.datetime.now()+datetime.timedelta(hours=-1)).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] #计算1天前的时间 dayAgo=(datetime.datetime.now()+datetime.timedelta(days=-1)).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] def strtime_to_datetime(timestr): """将字符串格式的时间 (含毫秒) 转为 datetime 格式 :param timestr: {str}'2016-02-25 20:21:04.242' :return: {datetime}2016-02-25 20:21:04.242000 """ local_datetime = datetime.datetime.strptime(timestr, "%Y-%m-%d %H:%M:%S.%f") return local_datetime def datetime_to_timestamp(datetime_obj): """将本地(local) datetime 格式的时间 (含毫秒) 转为毫秒时间戳 :param datetime_obj: {datetime}2016-02-25 20:21:04.242000 :return: 13 位的毫秒时间戳 1456402864242 """ local_timestamp = int(time.mktime(datetime_obj.timetuple()) * 1000.0 + datetime_obj.microsecond / 1000.0) return local_timestamp def strtime_to_timestamp(local_timestr): """将本地时间 (字符串格式,含毫秒) 转为 13 位整数的毫秒时间戳 :param local_timestr: {str}'2016-02-25 20:21:04.242' :return: 1456402864242 """ local_datetime = strtime_to_datetime(local_timestr) timestamp = datetime_to_timestamp(local_datetime) return timestamp def set_interval(startTime,endTime): d1 = datetime.datetime.strptime(startTime, "%Y-%m-%d %H:%M:%S.%f") d2 = datetime.datetime.strptime(endTime, "%Y-%m-%d %H:%M:%S.%f") n_days = d2 - d1 days = n_days.days year = days // 365 month = days % 365 // 30 seconds = n_days.seconds mins = seconds // 60 hours = mins // 60 if year >=1: return "1M" elif month == 1: return "12h" elif month > 1: return "1d" elif days == 1: return "30m" elif days >=2 and days <=4: return "1h" elif days >= 5 and days <=12: return "3h" elif days >=13 and days <= 30: return "12h" elif hours == 1 or hours ==2: return "1m" elif hours >=3 and hours<5: return "3m" elif hours >=5 and hours<10: return "5m" elif hours >=10 and hours<=60: return "10m" elif mins>=1 and mins<=30: return "30s" elif mins>30 and mins <=60: return "1m" def search_qps(startTime,endTime): interval=set_interval(startTime,endTime) print("interval设置为 %s" %(interval)) print("开始qps时间%s" %(startTime)) print("结束qps时间%s " %(endTime)) body={ "size": 0, "query": { "filtered": { "query": { "query_string": { "analyze_wildcard": True, "query": "appname:"+appname } }, "filter": { "bool": { "must": [ { "range": { "@timestamp": { "gte": strtime_to_timestamp(startTime), "lte": strtime_to_timestamp(endTime) } } } ] } } } }, "aggs": { "res": { "date_histogram": { "field": "@timestamp", "interval": "%s" %(interval), "time_zone": "Asia/Shanghai", "min_doc_count": 0, "extended_bounds": { "min": strtime_to_timestamp(startTime), "max": strtime_to_timestamp(endTime) } } } } } res=es.search(body=body) print(res) print("查询到qps总条数: %s" %(len(res["aggregations"]["res"]["buckets"]))) def search_wastetime(startTime,endTime): print("开始延迟时间%s" % (startTime)) print("结束延迟时间%s " % (endTime)) body = { "size": 0, "query": { "filtered": { "query": { "query_string": { "analyze_wildcard": True, "query": "appname:" + appname } }, "filter": { "bool": { "must": [ { "range": { "@timestamp": { "gte": strtime_to_timestamp(startTime), "lte": strtime_to_timestamp(endTime) } } } ] } } } }, "aggs": { "avg_wastetime": { "avg": { "field": "waste_time" } } } } res=es.search(body=body) print(res) def search_abnormal(startTime,endTime): print("开始异常统计时间%s" % (startTime)) print("结束异常统计时间%s " % (endTime)) body = { "size": 0, "query": { "filtered": { "query": { "query_string": { "analyze_wildcard": True, "query": "appname:" + appname } }, "filter": { "bool": { "must": [ { "range": { "@timestamp": { "gte": strtime_to_timestamp(startTime), "lte": strtime_to_timestamp(endTime) } } } ] } } } }, "aggs": { "res": { "terms": { "field": "success" } } } } res=es.search(body=body) reslist=res["aggregations"]["res"]["buckets"] for re in reslist: if re["key"] == "false": print("统计到的异常数 %s" %(re["doc_count"])) print("---------------------15分钟内的qps总数-------------------") search_qps(fifteenminAgo,nowtime) print("-------------------用户设置时间段的qps总数------------------------") search_qps(userChoiceTime_start,userChoiceTime_end) print("*******************************************************************") print("---------------------15分钟内的延迟平均值-------------------") search_wastetime(fifteenminAgo,nowtime) print("-------------------用户设置时间段的延迟平均值------------------------") search_wastetime(userChoiceTime_start,userChoiceTime_end) print("*******************************************************************") print("---------------------15分钟内的异常数-------------------") search_abnormal(fifteenminAgo,nowtime) print("-------------------用户设置时间段的异常数------------------------") search_abnormal(userChoiceTime_start,userChoiceTime_end)
def search_test(): body={ "query" : { "constant_score" : { "filter" : { "range" : { "@timestamp":{ "gt": "now-3m" } } } } } } res=es.search(body=body) print(res) import time, datetime def timeToZtime(time): myTime = datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S") return myTime.strftime("%Y-%m-%dT%H:%M:%S.%fZ") def search_test2(): body={ "query" : { "constant_score" : { "filter" : { "range" : { "@timestamp":{ "gt": timeToZtime("2019-01-13 12:10:30"), "lt": timeToZtime("2019-01-13 12:10:30")+"||+1M" } } } } } } res=es.search(body=body) print(res) search_test2()
import time, datetime def strtime_to_datetime(timestr): """将字符串格式的时间 (含毫秒) 转为 datetime 格式 :param timestr: {str}'2016-02-25 20:21:04.242' :return: {datetime}2016-02-25 20:21:04.242000 """ local_datetime = datetime.datetime.strptime(timestr, "%Y-%m-%d %H:%M:%S.%f") return local_datetime def datetime_to_timestamp(datetime_obj): """将本地(local) datetime 格式的时间 (含毫秒) 转为毫秒时间戳 :param datetime_obj: {datetime}2016-02-25 20:21:04.242000 :return: 13 位的毫秒时间戳 1456402864242 """ local_timestamp = int(time.mktime(datetime_obj.timetuple()) * 1000.0 + datetime_obj.microsecond / 1000.0) return local_timestamp def strtime_to_timestamp(local_timestr): """将本地时间 (字符串格式,含毫秒) 转为 13 位整数的毫秒时间戳 :param local_timestr: {str}'2016-02-25 20:21:04.242' :return: 1456402864242 """ local_datetime = strtime_to_datetime(local_timestr) timestamp = datetime_to_timestamp(local_datetime) return timestamp #调用api处理时间 def search_qps_avg(): body2={ "size": 0, "query": { "filtered": { "query": { "query_string": { "analyze_wildcard": True, "query": "appname:dzgzpt-wsys" } }, "filter": { "bool": { "must": [ { "range": { "@timestamp": { "gte": strtime_to_timestamp("2019-04-05 09:27:27.820"), "lte": strtime_to_timestamp("2019-04-18 09:34:41.986") } } } ] } } } }, "aggs": { "2": { "date_histogram": { "field": "@timestamp", "interval": "12h", "time_zone": "Asia/Shanghai", "min_doc_count": 1, "extended_bounds": { "min": strtime_to_timestamp("2019-04-05 09:27:27.820"), "max": strtime_to_timestamp("2019-04-18 09:34:41.986") } } } } } body1 = { "size": 0, "query": { "filtered": { "query": { "query_string": { "analyze_wildcard": 'true', "query": "appname:dzgzpt-wsys" } }, "filter": { "bool": { "must": [ { "range": { "@timestamp": { "gte": 1554427647820, "lte": 1555551281986, "format": "epoch_millis" } } } ], "must_not": [] } } } }, "aggs": { "2": { "date_histogram": { "field": "@timestamp", "interval": "12h", "time_zone": "Asia/Shanghai", "min_doc_count": 1, "extended_bounds": { "min": 1554427647820, "max": 1555551281986 } } } } } res=es.search(body=body2) print(res) search_qps_avg()
#unix的utc标准时间 print(datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]) #东八区时间 print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]) #计算15分钟后的时间 print((datetime.datetime.now()+datetime.timedelta(minutes=15)).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) #计算15分钟前的时间 print((datetime.datetime.now()+datetime.timedelta(minutes=-15)).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) #计算一天后的时间 print((datetime.datetime.now()+datetime.timedelta(days=1)).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) #计算1个小时后的时间 print((datetime.datetime.now()+datetime.timedelta(hours=1)).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) 2019-06-04 05:37:41.564 2019-06-04 13:37:41.564 2019-06-04 13:52:41.564 2019-06-04 13:22:41.564 2019-06-05 13:37:41.564 2019-06-04 14:37:41.564
import datetime d1 = datetime.datetime.strptime("2019-06-27 09:27:27.820", "%Y-%m-%d %H:%M:%S.%f") d2 = datetime.datetime.strptime("2021-07-29 15:43:41.986", "%Y-%m-%d %H:%M:%S.%f") n_days = d2 - d1 print("------------计算时分秒-----------------") seconds = n_days.seconds print("相差秒数: %s" %(seconds)) mins = seconds // 60 print("相差分钟数: %s" %(mins)) hours = mins // 60 print("相差小时数: %s" %(hours)) print("-------------计算年月日---------------------") days=n_days.days print("相差天数: %s" %(days)) month= days%365//30 print("相差月数: %s" %(month)) year = days // 365 print("相差年数: %s" %(year))