zoukankan      html  css  js  c++  java
  • 通过抓取pintpoint2的页面信息把数据存入数据库python3

    目标:对生产环境的服务质量进行量化,

    解决办法:把pintpoint2里的数据转存入mysql数据库,作成报表,目前支持总请求数,错误请求数,中位数,平均数,95值(每分钟一次定时任务),其它指标可以根据要求进行增加。

    数据库建库语句:

    CREATE TABLE `time_analysis` (
    `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
    `datetime` datetime DEFAULT NULL COMMENT '时间',
    `application_name` varchar(32) DEFAULT NULL COMMENT '应用名',
    `totalcount` int(8) DEFAULT NULL COMMENT '总请求数',
    `errorcount` int(8) DEFAULT NULL COMMENT '错误请求数',
    `median` int(5) DEFAULT NULL COMMENT '中位数',
    `average` int(5) DEFAULT NULL COMMENT '平均数',
    `distribution95` int(5) DEFAULT NULL COMMENT '95值',
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=358 DEFAULT CHARSET=utf8mb4 COMMENT='数据分析结果存署表';

    CREATE TABLE `application_list` (
    `application_name` varchar(32) NOT NULL,
    `service_type` varchar(32) DEFAULT NULL COMMENT '服务类型',
    `code` int(11) DEFAULT NULL COMMENT '服务类型代码',
    `agents` int(11) DEFAULT NULL COMMENT 'agent个数',
    `agentlists` varchar(256) DEFAULT NULL COMMENT 'agent list',
    `update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
    PRIMARY KEY (`application_name`),
    UNIQUE KEY `Unique_App` (`application_name`) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='pinpoint app list';

    代码:getscatterdata.py 数据分析程序

    # -*- coding: utf-8 -*-

    # noinspection PyInterpreter,PyInterpreter

    import sys
    import requests
    import time
    import datetime
    import json
    import numpy as np

    sys.path.append('../Golf')
    import db #db.py

    PPURL = "http://*****/"


    From_Time = datetime.datetime.now() + datetime.timedelta(seconds=-60)
    To_Time = datetime.datetime.now()
    From_TimeStamp = int(time.mktime(From_Time.timetuple()))*1000
    To_TimeStamp = int(time.mktime(datetime.datetime.now().timetuple()))*1000


    class PinPoint(object):
    """docstring for PinPoint"""
    def __init__(self, db):
    self.db = db
    super(PinPoint, self).__init__()

    """获取pinpoint中应用"""
    def get_applications(self):
    '''return application dict
    '''
    applicationListUrl = PPURL + "/applications.pinpoint"
    res = requests.get(applicationListUrl)
    if res.status_code != 200:
    print("请求异常,请检查")
    return
    applicationLists = []
    for app in res.json():
    applicationLists.append(app)
    applicationListDict={}
    applicationListDict["applicationList"] = applicationLists
    return applicationListDict
    def getAgentList(self, appname):
    AgentListUrl = PPURL + "/getAgentList.pinpoint"
    param = {
    'application':appname
    }
    res = requests.get(AgentListUrl, params=param)
    if res.status_code != 200:
    print("请求异常,请检查")
    return
    return len(res.json().keys()),json.dumps(list(res.json().keys()))

    def update_servermap(self, appname, from_time=From_TimeStamp,
    to_time=To_TimeStamp):
    '''更新app上下游关系
    :param appname: 应用名称
    :param from_time: 起始时间
    :param to_time: 终止时间
    :
    '''
    #https://pinpoint.*****.com/getServerMapData.pinpoint?applicationName=test-app&from=1547721493000&to=1547721553000&callerRange=1&calleeRange=1&serviceTypeName=TOMCAT&_=1547720614229
    #http://pinpoint.weixing-tech.com/getScatterData.pinpoint?application=daimler-manage-admin-pro&from=1618992044000&to=1618992104000&limit=5000&filter=&xGroupUnit=130&yGroupUnit=0&backwardDirection=true
    param = {
    'application':appname,
    'from':from_time,
    'to':to_time,
    'limit':5000,
    'filter':'',
    'xGroupUnit':130,
    'yGroupUnit':0,
    'backwardDirection':'true'
    }

    # serverMapUrl = PPURL + "/getScatterData.pinpoint"
    serverMapUrl = "{}{}".format(PPURL, "/getScatterData.pinpoint")
    res = requests.get(serverMapUrl, params=param)
    if res.status_code != 200:
    print("请求异常,请检查")
    return
    timetemp = float(from_time/1000)
    time_local = time.localtime(timetemp)
    update_time = time.strftime('%Y-%m-%d %H:%M:%S',time_local)
    time_analysis_list = []
    time_analysis_error = []
    links = res.json()["scatter"]["dotList"]
    for link in links :
    #时间戳,应用名,总请求数,错误请求数,中位数,平均数,95值(每分钟一次定时任务)
    time_analysis_list.append(link[1])
    time_analysis_error.append(link[4])
    if len(time_analysis_list) != 0:
    totalcount = int(len(time_analysis_list))
    errorcount = time_analysis_error.count(0)
    median = round(np.percentile(time_analysis_list,50))
    average = round(sum(time_analysis_list)/len(time_analysis_list))
    distribution95 = round(np.percentile(time_analysis_list,95))
    else:
    totalcount = 0
    errorcount = 0
    median = 0
    average = 0
    distribution95 = 0

    sql = """
    REPLACE into time_analysis( datetime, application_name, totalcount,errorcount, median, average, distribution95)
    VALUES ("{}", "{}", {}, {}, {}, {}, {});""".format(update_time,appname,totalcount,errorcount,median,average,distribution95)
    self.db.db_execute(sql)

    def update_app(self):
    """更新application
    """
    appdict = self.get_applications()
    apps = appdict.get("applicationList")
    update_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
    for app in apps:
    if app['applicationName'].startswith('test'):
    continue
    agents, agentlists = self.getAgentList(app['applicationName'])
    sql = """
    REPLACE into application_list( application_name,
    service_type, code, agents, agentlists, update_time)
    VALUES ("{}", "{}", {}, {}, '{}', "{}");""".format(
    app['applicationName'], app['serviceType'],
    app['code'], agents, agentlists, update_time)
    self.db.db_execute(sql)
    return True

    def update_all_servermaps(self):
    """更新所有应用数
    """
    appdict = self.get_applications()
    apps = appdict.get("applicationList")
    for app in apps:
    self.update_servermap(app['applicationName'])
    ###删除3600天前数据
    Del_Time = datetime.datetime.now() + datetime.timedelta(days=-3600)

    sql = """delete from application_server_map where update_time <= "{}"
    """.format(Del_Time)
    self.db.db_execute(sql)
    return True


    def connect_db():
    """ 建立SQL连接
    """
    mydb = db.MyDB(
    host="********",
    user="******",
    passwd="******",
    db="******"
    )
    mydb.db_connect()
    mydb.db_cursor()
    return mydb

    def main():
    db = connect_db()
    pp = PinPoint(db)
    pp.update_app()
    pp.update_all_servermaps()
    db.db_close()


    if __name__ == '__main__':
    main()

    db.py

    import mysql.connector
    class MyDB(object):
    """docstring for MyDB"""
    def __init__(self, host, user, passwd , db):
    self.host = host
    self.user = user
    self.passwd = passwd
    self.db = db

    self.connect = None
    self.cursor = None
    def db_connect(self):
    """数据库连接
    """
    self.connect = mysql.connector.connect(host=self.host, user=self.user, passwd=self.passwd, database=self.db)
    return self
    def db_cursor(self):
    if self.connect is None:
    self.connect = self.db_connect()

    if not self.connect.is_connected():
    self.connect = self.db_connect()
    self.cursor = self.connect.cursor()
    return self
    def get_rows(self , sql):
    """ 查询数据库结果
    :param sql: SQL语句
    :param cursor: 数据库游标
    """
    self.cursor.execute(sql)
    return self.cursor.fetchall()
    def db_execute(self, sql):
    self.cursor.execute(sql)
    self.connect.commit()
    def db_close(self):
    """关闭数据库连接和游标
    :param connect: 数据库连接实例
    :param cursor: 数据库游标
    """
    if self.connect:
    self.connect.close()
    if self.cursor:
    self.cursor.close()
  • 相关阅读:
    Java实现 蓝桥杯VIP 算法提高 P0404
    Java实现 蓝桥杯VIP 算法提高 P0404
    Java实现 蓝桥杯VIP 算法提高 P0404
    Java实现 蓝桥杯VIP 算法提高 P0404
    Java实现 蓝桥杯VIP 算法提高 P0404
    Java实现 蓝桥杯VIP 算法训练 排列问题
    Java实现 蓝桥杯VIP 算法训练 排列问题
    Java实现 蓝桥杯VIP 算法训练 排列问题
    Java实现 蓝桥杯VIP 算法训练 排列问题
    关于模态/非模态对话框不响应菜单的UPDATE_COMMAND_UI消息(对对WM_INITMENUPOPUP消息的处理)
  • 原文地址:https://www.cnblogs.com/net2817/p/14700352.html
Copyright © 2011-2022 走看看