zoukankan      html  css  js  c++  java
  • 针对 WebSocket 协议的 Locust 压测脚本实现(基于 Locust 1.0 以上版本)

    Locust 默认支持 HTTP 协议(默认通过 HttpUser 类),我们也可以自行实现任意协议的 Client 对它 User 类进行继承(HttpUser 也是继承自 User)并增加所需要的方法,这样也就实现了任意协议的压测。

    针对 WebSocket 协议的 Locust 压测脚本实现无非就是三个步骤

    1. 编写一个 WebSocket Client,也就是定义一个 Class,实现 WS连接初始化、事件订阅、消息接收 所需要的方法
    2. 使用 WebSocket Client 继承 User 类,产生 WebsocketUser
    3. 依据测试用例编写压测脚本,使用 WebsocketUser内预定义的方法 实现并发的连接、事件订阅、消息接收

    脚本实现参考

    from locust import User, task, events, constant
    import time
    import websocket
    import ssl
    import json
    import jsonpath
    
    def eventType_success(eventType, recvText, total_time):
        events.request_success.fire(request_type="[RECV]",
                                    name=eventType,
                                    response_time=total_time,
                                    response_length=len(recvText))
    
    class WebSocketClient(object):
        
        _locust_environment = None
        
        def __init__(self, host):
            self.host = host
            # 针对 WSS 关闭 SSL 校验警报
            self.ws = websocket.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})
            
        def connect(self, burl):
            start_time = time.time()
            try:
                self.conn = self.ws.connect(url=burl)
            except websocket.WebSocketConnectionClosedException as e:
                total_time = int((time.time() - start_time) * 1000)
                events.request_failure.fire(
                    request_type="[Connect]", name='Connection is already closed', response_time=total_time, exception=e)
            except websocket.WebSocketTimeoutException as e:
                total_time = int((time.time() - start_time) * 1000)
                events.request_failure.fire(
                    request_type="[Connect]", name='TimeOut', response_time=total_time, exception=e)
            else:
                total_time = int((time.time() - start_time) * 1000)
                events.request_success.fire(
                    request_type="[Connect]", name='WebSocket', response_time=total_time, response_length=0)
            return self.conn
            
        def recv(self):
            return self.ws.recv()
            
        def send(self, msg):
            self.ws.send(msg)
            
    class WebsocketUser(User):
        abstract = True
        def __init__(self, *args, **kwargs):
            super(WebsocketUser, self).__init__(*args, **kwargs)
            self.client = WebSocketClient(self.host)
            self.client._locust_environment = self.environment
            
    class ApiUser(WebsocketUser):
        host = "wss://ws.xxxxx.com/"
        wait_time = constant(0)
        
        @task(1)
        def pft(self):
            # wss 地址
            self.url = 'wss://ws.xxxxx.com/ws?appid=futures&uid=10000000'
            self.data = {}
            self.client.connect(self.url)
            
            # 发送的订阅请求
            sendMsg = '{"appid":"futures","cover":0,"event":[
                {"type":"exchange_rate","toggle":1,"expireTime":86400},
                {"type":"accountInfo_USDT","toggle":1,"expireTime":86400},
                {"type":"ticker_BTC/USDT","toggle":1,"expireTime":86400}]}'
            self.client.send(sendMsg)
            
            while True:
                # 消息接收计时
                start_time = time.time()
                recv = self.client.recv()
                total_time = int((time.time() - start_time) * 1000)
                
                # 为每个推送过来的事件进行归类和独立计算性能指标
                try:
                    recv_j = json.loads(recv)
                    eventType_s = jsonpath.jsonpath(recv_j, expr='$.eventType')
                    eventType_success(eventType_s[0], recv, total_time)
                except websocket.WebSocketConnectionClosedException as e:
                    events.request_failure.fire(request_type="[ERROR] WebSocketConnectionClosedException",
                                                name='Connection is already closed.',
                                                response_time=total_time,
                                                exception=e)
                except:
                    print(recv)
                    # 正常 OK 响应,或者其它心跳响应加入进来避免当作异常处理
                    if 'ok' in recv:
                        eventType_success('ok', 'ok', total_time)
    
    

    class WebSocketClient

    1. 实现了 WebSocket 的所有行为方法,包括连接初始化、消息发送(订阅)、消息接收
    2. 对连接过程中的异常进行捕获统计,记录异常响应的时间,便于后续测试分析
    3. 这段脚本基本拷贝就能用:)

    class WebsocketUser

    1. 继承 Locust 的 user 成为 WebsocketUser

    class ApiUser

    1. 在这里加载 WebsocketUser,初始化的 user,发送订阅请求、并在一个死循环内接收消息推送内容
    2. 对接收的消息内容(json格式)进行解析,最终可以在 WEB UI 看到各种推送事件的推送统计
    3. 对接收推送过程中的异常进行捕获,记录异常响应的时间,便于后续测试分析

    也可以在死循环内加入心跳发送,但建议建议按照规则发送,避免过于频繁,此处略

    压测过程

  • 相关阅读:
    Qt中将QTableView中的数据导出为Excel文件
    QTableView另类打印解决方案(复用render函数去解决print问题)
    [网络]_[批量下载网站文件]
    Codecs是以plugin的形式被调用的(显示中文的codec plugin文件是qcncodecs4.dll),可静态载入和动态载入
    Qt的Model/View Framework解析(数据是从真正的“肉(raw)”里取得,Model提供肉,所以读写文件、操作数据库、网络通讯等一系列与数据打交道的工作就在model中做了)
    MinGW 编译zlib、libpng、libjpeg、libcurl等(全都是Qt项目)
    Move WriteBuffer ReadBuffer String
    Delphi调试DLL 不能调试 不能进入调试 注意!!!
    ssh三大框架,三层架构 整合测试!完整分页代码,JdbcTemplate等测试,存储过程调用,留着以后复习吧
    Delphi 的动态数组
  • 原文地址:https://www.cnblogs.com/huanghaopeng/p/13187807.html
Copyright © 2011-2022 走看看