zoukankan      html  css  js  c++  java
  • 使用python3抓取pinpoint应用信息入库

    使用python3抓取pinpoint应用信息入库

    Pinpoint是用Java编写的大型分布式系统的APM(应用程序性能管理)工具。 受Dapper的启发,Pinpoint提供了一种解决方案,通过在分布式应用程序中跟踪事务来帮助分析系统的整体结构以及它们中的组件之间的相互关系.

    pinpoint api:

    • /applications.pinpoint 获取applications基本信息
    • /getAgentList.pinpoint 获取对应application agent信息
    • /getServerMapData.pinpoint 获取对应app 基本数据流信息

    db.py

    import mysql.connector
    class MyDB(object):
        """docstring for MyDB"""
        def __init__(self, host, user, passwd , db):
            self.host = host
            self.user = user
            self.passwd = passwd
            self.db = db
    
            self.connect = None
            self.cursor = None
        def db_connect(self):
            """数据库连接
            """
            self.connect = mysql.connector.connect(host=self.host, user=self.user, passwd=self.passwd, database=self.db)
            return self
        def db_cursor(self):
            if self.connect is None:
                self.connect = self.db_connect()
    
            if not self.connect.is_connected():
                self.connect = self.db_connect()
            self.cursor = self.connect.cursor()
            return self
        def get_rows(self , sql):
            """ 查询数据库结果
            :param sql: SQL语句
            :param cursor: 数据库游标
            """
    
            self.cursor.execute(sql)
            return self.cursor.fetchall()
        def db_execute(self, sql):
            self.cursor.execute(sql)
            self.connect.commit()
        def db_close(self):
            """关闭数据库连接和游标
            :param connect: 数据库连接实例
            :param cursor: 数据库游标
            """
            if self.connect:
                self.connect.close()
            if self.cursor:
                self.cursor.close()

    pinpoint.py:

     
    # -*- coding: utf-8 -*-
    
    '''
    Copyright (c) 2018, mersap
    All rights reserved.
    
    摘    要: pinpoint.py
    创 建 者: mersap
    创建日期: 2019-01-17
    '''
    
    import sys
    import requests
    import time
    import datetime
    import json
    
    sys.path.append('../Golf')
    import db #db.py
    
    PPURL = "https://pinpoint.*******.com"
    
    
    From_Time = datetime.datetime.now() + datetime.timedelta(seconds=-60)
    To_Time = datetime.datetime.now()
    From_TimeStamp = int(time.mktime(From_Time.timetuple()))*1000
    To_TimeStamp = int(time.mktime(datetime.datetime.now().timetuple()))*1000
    
    
    class PinPoint(object):
        """docstring for PinPoint"""
        def __init__(self, db):
            self.db = db
            super(PinPoint, self).__init__()
    
        """获取pinpoint中应用"""
        def get_applications(self):
            '''return application dict
            '''
            applicationListUrl = PPURL + "/applications.pinpoint"
            res = requests.get(applicationListUrl)
            if res.status_code != 200:
                print("请求异常,请检查")
                return
            applicationLists = []
            for app in res.json():
                applicationLists.append(app)
            applicationListDict={}
            applicationListDict["applicationList"] = applicationLists
            return applicationListDict
        def getAgentList(self, appname):
            AgentListUrl = PPURL + "/getAgentList.pinpoint"
            param = {
                'application':appname
            }
            res = requests.get(AgentListUrl, params=param)
            if res.status_code != 200:
                print("请求异常,请检查")
                return
            return len(res.json().keys()),json.dumps(list(res.json().keys()))
            
        def update_servermap(self, appname , from_time=From_TimeStamp,
                             to_time=To_TimeStamp, serviceType='SPRING_BOOT'):
            '''更新app上下游关系
            :param appname: 应用名称
            :param serviceType: 应用类型
            :param from_time: 起始时间
            :param to_time: 终止时间
            :
            '''
            #https://pinpoint.*****.com/getServerMapData.pinpoint?applicationName=test-app&from=1547721493000&to=1547721553000&callerRange=1&calleeRange=1&serviceTypeName=TOMCAT&_=1547720614229
            param = {
                'applicationName':appname,
                'from':from_time,
                'to':to_time,
                'callerRange':1,
                'calleeRange':1,
                'serviceTypeName':serviceType
            }
    
            # serverMapUrl = PPURL + "/getServerMapData.pinpoint"
            serverMapUrl = "{}{}".format(PPURL, "/getServerMapData.pinpoint")
            res = requests.get(serverMapUrl, params=param)
            if res.status_code != 200:
                print("请求异常,请检查")
                return
            update_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
            links = res.json()["applicationMapData"]["linkDataArray"]
            for link in links :
                ###排除test的应用
                if link['sourceInfo']['applicationName'].startswith('test'):
                    continue
                #应用名称、应用类型、下游应用名称、下游应用类型、应用节点数、下游应用节点数、总请求数、 错误请求数、慢请求数(本应用到下一个应用的数量)
                application = link['sourceInfo']['applicationName']
                serviceType = link['sourceInfo']['serviceType']
                to_application = link['targetInfo']['applicationName']
                to_serviceType = link['targetInfo']['serviceType']
                agents = len(link.get('fromAgent',' '))
                to_agents =  len(link.get('toAgent',' '))
                totalCount = link['totalCount']
                errorCount = link['errorCount']
                slowCount  = link['slowCount']
    
                sql = """
                    REPLACE into application_server_map (application, serviceType, 
                    agents, to_application, to_serviceType, to_agents, totalCount, 
                    errorCount,slowCount, update_time, from_time, to_time) 
                    VALUES ("{}", "{}", {}, "{}", "{}", {}, {}, {}, {},"{}","{}",
                    "{}")""".format(
                        application, serviceType, agents, to_application, 
                        to_serviceType, to_agents, totalCount, errorCount,
                         slowCount, update_time, From_Time, To_Time)
                self.db.db_execute(sql)
    
        def update_app(self):
            """更新application
            """
            appdict = self.get_applications()
            apps = appdict.get("applicationList")
            update_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
            for app in apps:
                if app['applicationName'].startswith('test'):
                    continue
                agents, agentlists = self.getAgentList(app['applicationName'])
                sql = """
                    REPLACE  into application_list( application_name, 
                    service_type, code, agents, agentlists, update_time) 
                    VALUES ("{}", "{}", {}, {}, '{}', "{}");""".format(
                        app['applicationName'], app['serviceType'], 
                        app['code'], agents, agentlists, update_time)
                self.db.db_execute(sql)
            return True
    
        def update_all_servermaps(self):
            """更新所有应用数
            """
            appdict = self.get_applications()
            apps = appdict.get("applicationList")
            for app in apps:
                self.update_servermap(app['applicationName'], serviceType=app['serviceType'])
            ###删除7天前数据
            Del_Time = datetime.datetime.now() + datetime.timedelta(days=-7)
    
            sql = """delete from application_server_map where update_time <= "{}"
            """.format(Del_Time)
            self.db.db_execute(sql)
            return True
    
    
    def connect_db():
        """ 建立SQL连接
        """
        mydb = db.MyDB(
                host="rm-*****.mysql.rds.aliyuncs.com",
                user="user",
                passwd="passwd",
                db="pinpoint"
                )
        mydb.db_connect()
        mydb.db_cursor()
        return mydb
    
    def main():
        db = connect_db()
        pp = PinPoint(db)
        pp.update_app()
        pp.update_all_servermaps()
        db.db_close()
    
    
    if __name__ == '__main__':
        main()
    • 附sql语句
    
    CREATE TABLE `application_list` (
      `application_name` varchar(32) NOT NULL,
      `service_type` varchar(32) DEFAULT NULL COMMENT '服务类型',
      `code` int(11) DEFAULT NULL COMMENT '服务类型代码',
      `agents` int(11) DEFAULT NULL COMMENT 'agent个数',
      `agentlists` varchar(256) DEFAULT NULL COMMENT 'agent list',
      `update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
      PRIMARY KEY (`application_name`),
      UNIQUE KEY `Unique_App` (`application_name`) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='pinpoint app list'
    
    CREATE TABLE `application_server_map` (
      `application` varchar(32) NOT NULL COMMENT '应用名称',
      `serviceType` varchar(8) NOT NULL,
      `agents` int(2) NOT NULL COMMENT 'agent个数',
      `to_application` varchar(32) NOT NULL COMMENT '下游服务名称',
      `to_serviceType` varchar(32) DEFAULT NULL COMMENT '下游服务类型',
      `to_agents` int(2) DEFAULT NULL COMMENT '下游服务agent数量',
      `totalCount` int(8) DEFAULT NULL COMMENT '总请求数',
      `errorCount` int(8) DEFAULT NULL,
      `slowCount` int(8) DEFAULT NULL,
      `update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,
      `from_time` datetime DEFAULT NULL,
      `to_time` datetime DEFAULT NULL,
      PRIMARY KEY (`application`,`to_application`),
      UNIQUE KEY `Unique_AppMap` (`application`,`to_application`) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='应用链路数据'
  • 相关阅读:
    web.xml配置详解
    oracle按时间创建分区表
    cron表达式详解
    临时表
    配置非安装版tomcat服务
    CodeForces 785 D Anton and School
    CodeForces 601B Lipshitz Sequence
    CodeForces 590C Three States BFS
    CodeForces 592D Super M DP
    CodeForces 507E Breaking Good 2维权重dij
  • 原文地址:https://www.cnblogs.com/yum777/p/12155632.html
Copyright © 2011-2022 走看看