# #!/usr/bin/env python
# # -*- coding: utf8 -*-
import socket,re
from prometheus_client import generate_latest, Gauge,Info
from prometheus_client.core import CollectorRegistry
from psutil import virtual_memory
from psutil import cpu_times
def check_port(ip, port):
'''socket检测端口连通性'''
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(2)
try:
s.connect((ip, int(port)))
s.shutdown(2)
return True
except:
return False
REGISTRY = CollectorRegistry(auto_describe=False)
allportdata = [{'ip':'10.20.44.61','port':22},{'ip':'10.20.44.62','port':22},{'ip':'10.20.44.63','port':22}]
mem_percent = Gauge(
"system_memory_percent",
"Total system memory percent.",
registry=REGISTRY)
cpu_per = Gauge(
"system_cpu_percent",
"Total system cpu percent.",
registry=REGISTRY)
mem_percent.set(virtual_memory().percent)
cpu_per.set(cpu_times().system)
METRIC_NAME_RE = re.compile(r'^[a-zA-Z_:][a-zA-Z0-9_:]*$')
name = 'ip_port_conn_status'
documentation = "IP PORT SOCKET CONNECT STATUS."
portcheck = Gauge(name=name, documentation=documentation, labelnames=('ip', 'port'), registry=REGISTRY)
for i in allportdata:
status = check_port(i['ip'], i['port'])
portcheck.labels(i['ip'], i['port']).inc(status)
result = generate_latest(REGISTRY).decode()
print(result)
Python prometheus-client 安装
pip install prometheus-client
Python封装
# encoding: utf-8
from prometheus_client import Counter, Gauge, Summary
from prometheus_client.core import CollectorRegistry
from prometheus_client.exposition import choose_encoder
class Monitor:
def __init__(self):
# 注册收集器&最大耗时map
self.collector_registry = CollectorRegistry(auto_describe=False)
self.request_time_max_map = {}
# 接口调用summary统计
self.http_request_summary = Summary(name="http_server_requests_seconds",
documentation="Num of request time summary",
labelnames=("method", "code", "uri"),
registry=self.collector_registry)
# 接口最大耗时统计
self.http_request_max_cost = Gauge(name="http_server_requests_seconds_max",
documentation="Number of request max cost",
labelnames=("method", "code", "uri"),
registry=self.collector_registry)
# 请求失败次数统计
self.http_request_fail_count = Counter(name="http_server_requests_error",
documentation="Times of request fail in total",
labelnames=("method", "code", "uri"),
registry=self.collector_registry)
# 模型预测耗时统计
self.http_request_predict_cost = Counter(name="http_server_requests_seconds_predict",
documentation="Seconds of prediction cost in total",
labelnames=("method", "code", "uri"),
registry=self.collector_registry)
# 图片下载耗时统计
self.http_request_download_cost = Counter(name="http_server_requests_seconds_download",
documentation="Seconds of download cost in total",
labelnames=("method", "code", "uri"),
registry=self.collector_registry)
# 获取/metrics结果
def get_prometheus_metrics_info(self, handler):
encoder, content_type = choose_encoder(handler.request.headers.get('accept'))
handler.set_header("Content-Type", content_type)
handler.write(encoder(self.collector_registry))
self.reset_request_time_max_map()
# summary统计
def set_prometheus_request_summary(self, handler):
self.http_request_summary.labels(handler.request.method, handler.get_status(), handler.request.path).observe(handler.request.request_time())
self.set_prometheus_request_max_cost(handler)
# 自定义summary统计
def set_prometheus_request_summary_customize(self, method, status, path, cost_time):
self.http_request_summary.labels(method, status, path).observe(cost_time)
self.set_prometheus_request_max_cost_customize(method, status, path, cost_time)
# 失败统计
def set_prometheus_request_fail_count(self, handler, amount=1.0):
self.http_request_fail_count.labels(handler.request.method, handler.get_status(), handler.request.path).inc(amount)
# 自定义失败统计
def set_prometheus_request_fail_count_customize(self, method, status, path, amount=1.0):
self.http_request_fail_count.labels(method, status, path).inc(amount)
# 最大耗时统计
def set_prometheus_request_max_cost(self, handler):
requset_cost = handler.request.request_time()
if self.check_request_time_max_map(handler.request.path, requset_cost):
self.http_request_max_cost.labels(handler.request.method, handler.get_status(), handler.request.path).set(requset_cost)
self.request_time_max_map[handler.request.path] = requset_cost
# 自定义最大耗时统计
def set_prometheus_request_max_cost_customize(self, method, status, path, cost_time):
if self.check_request_time_max_map(path, cost_time):
self.http_request_max_cost.labels(method, status, path).set(cost_time)
self.request_time_max_map[path] = cost_time
# 预测耗时统计
def set_prometheus_request_predict_cost(self, handler, amount=1.0):
self.http_request_predict_cost.labels(handler.request.method, handler.get_status(), handler.request.path).inc(amount)
# 自定义预测耗时统计
def set_prometheus_request_predict_cost_customize(self, method, status, path, cost_time):
self.http_request_predict_cost.labels(method, status, path).inc(cost_time)
# 下载耗时统计
def set_prometheus_request_download_cost(self, handler, amount=1.0):
self.http_request_download_cost.labels(handler.request.method, handler.get_status(), handler.request.path).inc(amount)
# 自定义下载耗时统计
def set_prometheus_request_download_cost_customize(self, method, status, path, cost_time):
self.http_request_download_cost.labels(method, status, path).inc(cost_time)
# 校验是否赋值最大耗时map
def check_request_time_max_map(self, uri, cost):
if uri not in self.request_time_max_map:
return True
if self.request_time_max_map[uri] < cost:
return True
return False
# 重置最大耗时map
def reset_request_time_max_map(self):
for key in self.request_time_max_map:
self.request_time_max_map[key] = 0.0
调用
import tornado
import tornado.ioloop
import tornado.web
import tornado.gen
from datetime import datetime
from tools.monitor import Monitor
global g_monitor
class ClassifierHandler(tornado.web.RequestHandler):
def post(self):
# TODO Something you need
# work....
# 统计Summary,包括请求次数和每次耗时
g_monitor.set_prometheus_request_summary(self)
self.write("OK")
class PingHandler(tornado.web.RequestHandler):
def head(self):
print('INFO', datetime.now(), "/ping Head.")
g_monitor.set_prometheus_request_summary(self)
self.write("OK")
def get(self):
print('INFO', datetime.now(), "/ping Get.")
g_monitor.set_prometheus_request_summary(self)
self.write("OK")
class MetricsHandler(tornado.web.RequestHandler):
def get(self):
print('INFO', datetime.now(), "/metrics Get.")
g_monitor.set_prometheus_request_summary(self)
# 通过Metrics接口返回统计结果
g_monitor.get_prometheus_metrics_info(self)
def make_app():
return tornado.web.Application([
(r"/ping?", PingHandler),
(r"/metrics?", MetricsHandler),
(r"/work?", ClassifierHandler)
])
if __name__ == "__main__":
g_monitor = Monitor()
app = make_app()
app.listen(port)
tornado.ioloop.IOLoop.current().start()
Metrics返回结果实例