针对 WebSocket 协议的 Locust 压测脚本实现(基于 Locust 1.0 以上版本)
Locust 默认支持 HTTP 协议(默认通过 HttpUser 类),我们也可以自行实现任意协议的 Client 对它 User 类进行继承(HttpUser 也是继承自 User)并增加所需要的方法,这样也就实现了任意协议的压测。
针对 WebSocket 协议的 Locust 压测脚本实现无非就是三个步骤
- 编写一个 WebSocket Client,也就是定义一个 Class,实现 WS连接初始化、事件订阅、消息接收 所需要的方法
- 使用 WebSocket Client 继承 User 类,产生 WebsocketUser
- 依据测试用例编写压测脚本,使用 WebsocketUser内预定义的方法 实现并发的连接、事件订阅、消息接收
脚本实现参考
from locust import User, task, events, constant
import time
import websocket
import ssl
import json
import jsonpath
def eventType_success(eventType, recvText, total_time):
events.request_success.fire(request_type="[RECV]",
name=eventType,
response_time=total_time,
response_length=len(recvText))
class WebSocketClient(object):
_locust_environment = None
def __init__(self, host):
self.host = host
# 针对 WSS 关闭 SSL 校验警报
self.ws = websocket.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})
def connect(self, burl):
start_time = time.time()
try:
self.conn = self.ws.connect(url=burl)
except websocket.WebSocketConnectionClosedException as e:
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(
request_type="[Connect]", name='Connection is already closed', response_time=total_time, exception=e)
except websocket.WebSocketTimeoutException as e:
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(
request_type="[Connect]", name='TimeOut', response_time=total_time, exception=e)
else:
total_time = int((time.time() - start_time) * 1000)
events.request_success.fire(
request_type="[Connect]", name='WebSocket', response_time=total_time, response_length=0)
return self.conn
def recv(self):
return self.ws.recv()
def send(self, msg):
self.ws.send(msg)
class WebsocketUser(User):
abstract = True
def __init__(self, *args, **kwargs):
super(WebsocketUser, self).__init__(*args, **kwargs)
self.client = WebSocketClient(self.host)
self.client._locust_environment = self.environment
class ApiUser(WebsocketUser):
host = "wss://ws.xxxxx.com/"
wait_time = constant(0)
@task(1)
def pft(self):
# wss 地址
self.url = 'wss://ws.xxxxx.com/ws?appid=futures&uid=10000000'
self.data = {}
self.client.connect(self.url)
# 发送的订阅请求
sendMsg = '{"appid":"futures","cover":0,"event":[
{"type":"exchange_rate","toggle":1,"expireTime":86400},
{"type":"accountInfo_USDT","toggle":1,"expireTime":86400},
{"type":"ticker_BTC/USDT","toggle":1,"expireTime":86400}]}'
self.client.send(sendMsg)
while True:
# 消息接收计时
start_time = time.time()
recv = self.client.recv()
total_time = int((time.time() - start_time) * 1000)
# 为每个推送过来的事件进行归类和独立计算性能指标
try:
recv_j = json.loads(recv)
eventType_s = jsonpath.jsonpath(recv_j, expr='$.eventType')
eventType_success(eventType_s[0], recv, total_time)
except websocket.WebSocketConnectionClosedException as e:
events.request_failure.fire(request_type="[ERROR] WebSocketConnectionClosedException",
name='Connection is already closed.',
response_time=total_time,
exception=e)
except:
print(recv)
# 正常 OK 响应,或者其它心跳响应加入进来避免当作异常处理
if 'ok' in recv:
eventType_success('ok', 'ok', total_time)
class WebSocketClient
- 实现了 WebSocket 的所有行为方法,包括连接初始化、消息发送(订阅)、消息接收
- 对连接过程中的异常进行捕获统计,记录异常响应的时间,便于后续测试分析
- 这段脚本基本拷贝就能用:)
class WebsocketUser
- 继承 Locust 的 user 成为 WebsocketUser
class ApiUser
- 在这里加载 WebsocketUser,初始化的 user,发送订阅请求、并在一个死循环内接收消息推送内容
- 对接收的消息内容(json格式)进行解析,最终可以在 WEB UI 看到各种推送事件的推送统计
- 对接收推送过程中的异常进行捕获,记录异常响应的时间,便于后续测试分析
也可以在死循环内加入心跳发送,但建议建议按照规则发送,避免过于频繁,此处略
压测过程

Camera2与TextureView使用
Collections常用方法总结
Android插件化框架
《战狼2》观后感——民族荣耀
《茶马古道》观后感——朝圣之路
点击查看大图Activity
图片压缩代码
《天那边》观后感——对一些现象的反思
recyclerView的使用
- 最新文章
-
pytho装饰器参数那些事_inspect.getcallargs
word2016打开2007文档出错
Excel去除单元格中的汉字
Excel小笔记——冻结窗格
Js日常笔记之数组
Js日常笔记之变量删除
Js日常笔记之this
C#对象深拷贝
NPOI(2.1.3)向excel中插入图片,xls文档图片插入成功,xlsx文档图片插入失败
sql关于group by之后把每一条记录的详情的某个字段值合并提取的方法
- 热门文章
-
SQL外键约束
重写(overwrite)、重载(overload)和覆盖(override)三者之间的区别
GET 对比 POST
OSI7层模型(TCP4层)
innodb 行级锁
HashMap,HashTable,ConcorrentHashMap的线程方式
encoder-decoder环境部署问题
LSTM java 实现
LSTM-based Encoder-Decoder for Multi-sensor Anomaly Detection
Learning Phrase Representations using RNN Encoder–Decoder for Statistical Machine Translation