locust版本:1.1.1
参考地址:https://www.jianshu.com/p/c6802a88beaf
locust 性能脚本 coupon_test.py
from locust import task, between, tag from locust.contrib.fasthttp import FastHttpUser from public.common import setup_request, store, coupon # 参数化函数、加密处理 from stats.listener import * # hook函数 class MyUser(FastHttpUser): host = "" wait_time = between(0, 0) @tag("getCustomerCouponList") @task def get_customer_coupon_list(self): data = {"status": 0, "pageSize": 15, "pageIndex": 1} req = {"body": data, "params": {}, "headers": {}} req = setup_request(req, self.host) with self.client.post("/api/coupons/app/getCustomerCouponList", headers=req["headers"], json=req["body"], name="/api/coupons/app/getCustomerCouponList", catch_response=True) as res: try: response = res.json() except Exception as e: logger.error("=======/api/coupons/app/getCustomerCouponList=== Exception ====>>>> {} >>>> {}".format(res.text, e)) res.failure("json error!") return try: if response["success"] and response["message"] == "请求成功": res.success() else: logger.error("=======/api/coupons/app/getCustomerCouponList=== 断言 ====>>>> {}".format(response)) res.failure("断言失败!") except KeyError as e: logger.error("=======/api/coupons/app/getCustomerCouponList=== KeyError ====>>>> {} >>>> {}".format(res.text, e)) res.failure("key error!")
......
--headless模式执行命令
locust -f testcasescouponcoupon_test.py --headless -u 10 -r 3 -t 1m -H https://xxxxx.com --logfile=loglocust.log --loglevel=INFO 1>log un.log 2>&1 --master
生成run.log文件格式
读取日志文件并写入 influxdb 数据库
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020-07-23 10:32
# @Author : lixiaofeng
# @Site :
# @File : helper.py
# @Software: PyCharm
import time import osimport jsonimport re import platform from io import StringIO from public.connect_influxdb import TestInflux def read_performance(t): """ 读取日志中的性能数据 :param t: 时间 time.time() :return: """ t1 = h.getvalue() if t1 and t - float(t1[:18]) < 5: # 5秒写入一次数据库 return h.seek(0) h.write(str(t)) performance_path = os.path.join(BASE_DIR, "log" + pattern + "run.log") with io.open(performance_path) as f: data_list = f.readlines() locust_list = [] i = 0 for data in data_list: res = re.match( r'^s+(?P<method>GET|POST)s+(?P<api>[/w?=-&]+)s+(?P<reqs>d+)s+(?P<fails>[d(.)\%]+)s+(?P<Avg>d+)s+(?P<Min>d+)s+(?P<Max>d+)s+(|)s+(?P<Median>d+)s+(?P<qps>[d(.)\%]+)s+(?P<failures>[d(.)\%]+)$', data) # 正则,匹配数据 if res: i += 1 # print(res.group('method'), res.group('api'), res.group('reqs'), res.group('fails'), res.group('Avg'), # res.group('Min'), res.group('Max'), res.group('Median'), res.group('qps'), res.group('failures')) method = res.group('method') api = res.group('api') reqs = res.group('reqs') fails = res.group('fails') Avg = res.group('Avg') Min = res.group('Min') Max = res.group('Max') Median = res.group('Median') qps = res.group('qps') failures = res.group('failures') locust_dict = {'Type': method, 'Name': api, 'Requests': reqs, 'Fails': fails, 'Average_ms': Avg, 'Min_ms': Min, 'Max_ms': Max, 'Median_ms': Median, 'Current_RPS': qps, 'Failures_s': failures} locust_list.append(locust_dict) influx.post_dump_data(locust_list)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020-07-31 13:21
# @Author : lixiaofeng
# @Site :
# @File : connect_influxdb.py
# @Software: PyCharm
from influxdb import InfluxDBClient class TestInflux: """ 连接 influxdb ,并写入数据 """ def __init__(self): self.influx_client = InfluxDBClient('localhost', 8086, '', '', 'mydb') self.make = False for database in self.influx_client.get_list_database(): if "mydb" not in database["name"]: self.make = True else: # self.influx_client.drop_measurement("locust") # 写入前删除数据 self.make = False if self.make: self.influx_client.create_database("mydb") def post_dump_data(self, data): """ :param data: 数据 list :return: """ if isinstance(data, list): for key in data: json_body = [ { "measurement": "locust", # "tags": key, "fields": key } ] self.influx_client.write_points(json_body)
在hook函数中对 read_performance 函数进行调用
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2020-07-29 11:20 # @Author : lixiaofeng # @Site : # @File : listener.py # @Software: PyCharm import time from locust import events from loguru import logger from public.helper import read_performance @events.quitting.add_listener def process_exit(environment, **kwargs): """ locust 退出时调用 :param environment: :param kwargs: :return: """ if environment.stats.total.fail_ratio > 0.01: logger.error("Test failed due to failure ratio > 1%") environment.process_exit_code = 1 elif environment.stats.total.avg_response_time > 200: logger.error("Test failed due to average response time ratio > 200 ms") environment.process_exit_code = 1 elif environment.stats.total.get_response_time_percentile(0.95) > 800: logger.error("Test failed due to 95th percentile response time > 800 ms") environment.process_exit_code = 1 else: environment.process_exit_code = 0 read_performance(time.time()) @events.request_success.add_listener def success(request_type, name, response_time, response_length, **kwargs): """ 接口 请求成功时调用 :param request_type: :param name: :param response_time: :param response_length: :param kwargs: :return: """ read_performance(time.time()) @events.request_failure.add_listener def failure(request_type, name, response_time, response_length, **kwargs): """ 接口 请求失败时调用 :param request_type: :param name: :param response_time: :param response_length: :param kwargs: :return: """ read_performance(time.time())
influxdb插入的数据
配置Grafana<安装步骤网上有详细教程,不再赘述>
设置influxdb数据源
面板中选择 可视化 table 展示,使用sql查询数据
select Type,"Name",Requests,Fails,Average_ms,Min_ms,Max_ms,Median_ms,Current_RPS,Failures_s from locust order by time desc limit 3
和web模式对比
~~~【完】