zoukankan      html  css  js  c++  java
  • ELK-Python(三)

     不具有通用性,留作纪念。

    [root@GXB-CTRLCENTER python]# cat insert_uv.py
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from datetime import *
    from with_conn_to_db import conn_to_mysql
    import urllib2,json
    import time
    
    ###define yestoday 0-24 hours delta part##########
    today = date.today()
    yestoday = today - timedelta(days=1)
    #today = today - timedelta(days=1)
    #print today,yestoday
    #import sys
    #sys.exit()
    a = str(yestoday) + ' ' + '00:00:00'
    b = str(today) + ' ' + '00:00:00'
    #print a,b
    timeArray1 = time.strptime(a, "%Y-%m-%d %H:%M:%S")
    timeArray2 = time.strptime(b, "%Y-%m-%d %H:%M:%S")
    start_time = int(time.mktime(timeArray1)) * 1000
    end_time = int(time.mktime(timeArray2)) * 1000
    
    #####define es index and search part########
    server = 'http://elk.xkops.com:9200/'
    #stat_index = 'client-visit-*'
    index='client-*'
    #start_time = 1459146210879
    #stop_time = 1459147110879
    url = server + index + "/_search?pretty=true"
    
    query_date={
      "query": {
        "filtered": {
          "query": {
            "query_string": {
              "analyze_wildcard": True,
              "query": "*"
            }
          },
          "filter": {
            "bool": {
              "must": [
                {
                  "range": {
                    "@timestamp": {
                      "gte": start_time,
                      "lte": end_time,
                      "format": "epoch_millis"
                    }
                  }
                }
              ],
              "must_not": []
            }
          }
        }
      },
      "size": 0,
      "aggs": {
        "4": {
          "terms": {
            "field": "visit_tenant_id",
            "size": 1000000,
            "order": {
              "1": "desc"
            }
          },
          "aggs": {
            "1": {
              "sum": {
                "field": "count"
              }
            },
            "3": {
              "terms": {
                "field": "client_type",
                "size": 2,
                "order": {
                  "1": "desc"
                }
              },
              "aggs": {
                "1": {
                  "sum": {
                    "field": "count"
                  }
                },
                "2": {
                  "terms": {
                    "field": "sessionId",
                    "size": 1000000,
                    "order": {
                      "1": "desc"
                    }
                  },
                  "aggs": {
                    "1": {
                      "sum": {
                        "field": "count"
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
    
    
    query_date = json.dumps(query_date)
    req = urllib2.Request(url,query_date)
    response = urllib2.urlopen(req)
    page = response.read()
    #print page
    result = json.loads(page)
    
    ###避免当天多次插入,插入前先删除#######
    sql = "delete from uv_stat where create_time = '%s'" % (yestoday)
    #print sql
    with conn_to_mysql('logstash') as db:
        db.execute(sql)
    
    for s in result['aggregations']['4']['buckets']:
        tenant_id =  s['key']
        type1 = s['3']['buckets'][0]['key']
        for a in   s['3']['buckets'][0]['2']['buckets']:
            session_id = a['key']
            #print tenant_id,type1,session_id
            sql = "insert into uv_stat(tenant_id,create_time,session_id,client_type) values('%s','%s','%s','%s')" %(tenant_id,yestoday,session_id,type1)
            #print sql
            with conn_to_mysql('logstash') as db:
                db.execute(sql)
        if len(s['3']['buckets']) > 1:
            type2 = s['3']['buckets'][1]['key']
            for a in s['3']['buckets'][1]['2']['buckets']:
                session_id = a['key']
                #print tenant_id,type2,session_id
                sql = "insert into uv_stat(tenant_id,create_time,session_id,client_type) values('%s','%s','%s','%s')" %(tenant_id,yestoday,session_id,type2)
                #print sql
                with conn_to_mysql('logstash') as db:
                    db.execute(sql)
        else:
            continue
  • 相关阅读:
    网络编程
    C 语言 const
    C 语言 链表
    C 语言 按位计算
    C 语言 格式化输出输入
    C 语言 结构类型 联合
    C 语言 结构类型 结构
    C 语言 结构类型 枚举
    bash shell configuration
    sed usage
  • 原文地址:https://www.cnblogs.com/xkops/p/5602086.html
Copyright © 2011-2022 走看看