不具有通用性,留作纪念。
[root@GXB-CTRLCENTER python]# cat insert_active_user.py #!/usr/bin/env python # -*- coding:utf-8 -*- from datetime import * from with_conn_to_db import conn_to_mysql import urllib2,json import time ###define yestoday 0-24 hours delta part########## today = date.today() yestoday = today - timedelta(days=1) #print today,yestoday a = str(yestoday) + ' ' + '00:00:00' b = str(today) + ' ' + '00:00:00' timeArray1 = time.strptime(a, "%Y-%m-%d %H:%M:%S") timeArray2 = time.strptime(b, "%Y-%m-%d %H:%M:%S") start_time = int(time.mktime(timeArray1)) * 1000 end_time = int(time.mktime(timeArray2)) * 1000 #####define es index and search part######## server = 'http://elk.xkops.com:9200/' #stat_index = 'client-visit-*' index='client-*' #start_time = 1459146210879 #stop_time = 1459147110879 url = server + index + "/_search?pretty=true" query_date={ "query": { "filtered": { "query": { "query_string": { "query": "*", "analyze_wildcard": True } }, "filter": { "bool": { "must": [ { "range": { "@timestamp": { "gte": start_time, "lte": end_time, "format": "epoch_millis" } } } ], "must_not": [] } } } }, "size": 0, "aggs": { "2": { "terms": { "field": "visit_tenant_id", "size": 10000000, "order": { "_count": "desc" } }, "aggs": { "3": { "terms": { "field": "user_id", "size": 0, "order": { "_count": "desc" } }, "aggs": { "4": { "terms": { "field": "ip_address", "size": 1, "order": { "_count": "desc" } } } } } } } } } query_date = json.dumps(query_date) req = urllib2.Request(url,query_date) response = urllib2.urlopen(req) page = response.read() #print page result = json.loads(page) ###避免当天多次插入,插入前先删除####### sql = "delete from active_user where create_time = '%s'" % (yestoday) with conn_to_mysql('logstash') as db: db.execute(sql) for s in result['aggregations']['2']['buckets']: #print s tenant_id = s['key'] if len(s['3']['buckets']) != 0: for a in range(len(s['3']['buckets'])): user_id = s['3']['buckets'][a]['key'] ip_address = s['3']['buckets'][a]['4']['buckets'][0]['key'] #print tenant_id,user_id,ip_address sql = "insert into active_user(tenant_id,create_time,user_id,ip_addr) values('%s','%s','%s','%s')" %(tenant_id,yestoday,user_id,ip_address) #print sql with conn_to_mysql('logstash') as db: db.execute(sql) else: continue