zoukankan      html  css  js  c++  java
  • 针对 WebSocket 协议的 Locust 压测脚本实现(基于 Locust 1.0 以上版本)

    针对 WebSocket 协议的 Locust 压测脚本实现(基于 Locust 1.0 以上版本)

     

    Locust 默认支持 HTTP 协议(默认通过 HttpUser 类),我们也可以自行实现任意协议的 Client 对它 User 类进行继承(HttpUser 也是继承自 User)并增加所需要的方法,这样也就实现了任意协议的压测。

    针对 WebSocket 协议的 Locust 压测脚本实现无非就是三个步骤

    1. 编写一个 WebSocket Client,也就是定义一个 Class,实现 WS连接初始化、事件订阅、消息接收 所需要的方法
    2. 使用 WebSocket Client 继承 User 类,产生 WebsocketUser
    3. 依据测试用例编写压测脚本,使用 WebsocketUser内预定义的方法 实现并发的连接、事件订阅、消息接收

    脚本实现参考

    from locust import User, task, events, constant
    import time
    import websocket
    import ssl
    import json
    import jsonpath
    
    def eventType_success(eventType, recvText, total_time):
        events.request_success.fire(request_type="[RECV]",
                                    name=eventType,
                                    response_time=total_time,
                                    response_length=len(recvText))
    
    class WebSocketClient(object):
        
        _locust_environment = None
        
        def __init__(self, host):
            self.host = host
            # 针对 WSS 关闭 SSL 校验警报
            self.ws = websocket.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})
            
        def connect(self, burl):
            start_time = time.time()
            try:
                self.conn = self.ws.connect(url=burl)
            except websocket.WebSocketConnectionClosedException as e:
                total_time = int((time.time() - start_time) * 1000)
                events.request_failure.fire(
                    request_type="[Connect]", name='Connection is already closed', response_time=total_time, exception=e)
            except websocket.WebSocketTimeoutException as e:
                total_time = int((time.time() - start_time) * 1000)
                events.request_failure.fire(
                    request_type="[Connect]", name='TimeOut', response_time=total_time, exception=e)
            else:
                total_time = int((time.time() - start_time) * 1000)
                events.request_success.fire(
                    request_type="[Connect]", name='WebSocket', response_time=total_time, response_length=0)
            return self.conn
            
        def recv(self):
            return self.ws.recv()
            
        def send(self, msg):
            self.ws.send(msg)
            
    class WebsocketUser(User):
        abstract = True
        def __init__(self, *args, **kwargs):
            super(WebsocketUser, self).__init__(*args, **kwargs)
            self.client = WebSocketClient(self.host)
            self.client._locust_environment = self.environment
            
    class ApiUser(WebsocketUser):
        host = "wss://ws.xxxxx.com/"
        wait_time = constant(0)
        
        @task(1)
        def pft(self):
            # wss 地址
            self.url = 'wss://ws.xxxxx.com/ws?appid=futures&uid=10000000'
            self.data = {}
            self.client.connect(self.url)
            
            # 发送的订阅请求
            sendMsg = '{"appid":"futures","cover":0,"event":[
                {"type":"exchange_rate","toggle":1,"expireTime":86400},
                {"type":"accountInfo_USDT","toggle":1,"expireTime":86400},
                {"type":"ticker_BTC/USDT","toggle":1,"expireTime":86400}]}'
            self.client.send(sendMsg)
            
            while True:
                # 消息接收计时
                start_time = time.time()
                recv = self.client.recv()
                total_time = int((time.time() - start_time) * 1000)
                
                # 为每个推送过来的事件进行归类和独立计算性能指标
                try:
                    recv_j = json.loads(recv)
                    eventType_s = jsonpath.jsonpath(recv_j, expr='$.eventType')
                    eventType_success(eventType_s[0], recv, total_time)
                except websocket.WebSocketConnectionClosedException as e:
                    events.request_failure.fire(request_type="[ERROR] WebSocketConnectionClosedException",
                                                name='Connection is already closed.',
                                                response_time=total_time,
                                                exception=e)
                except:
                    print(recv)
                    # 正常 OK 响应,或者其它心跳响应加入进来避免当作异常处理
                    if 'ok' in recv:
                        eventType_success('ok''ok', total_time)
    
    

    class WebSocketClient

    1. 实现了 WebSocket 的所有行为方法,包括连接初始化、消息发送(订阅)、消息接收
    2. 对连接过程中的异常进行捕获统计,记录异常响应的时间,便于后续测试分析
    3. 这段脚本基本拷贝就能用:)

    class WebsocketUser

    1. 继承 Locust 的 user 成为 WebsocketUser

    class ApiUser

    1. 在这里加载 WebsocketUser,初始化的 user,发送订阅请求、并在一个死循环内接收消息推送内容
    2. 对接收的消息内容(json格式)进行解析,最终可以在 WEB UI 看到各种推送事件的推送统计
    3. 对接收推送过程中的异常进行捕获,记录异常响应的时间,便于后续测试分析

    也可以在死循环内加入心跳发送,但建议建议按照规则发送,避免过于频繁,此处略

    压测过程

     
     
  • 相关阅读:
    获取数据窗口的report对象 pb
    职业中的人与人。。。
    关于表格的设置
    今天学到的关于UI的一点东西
    给QQ发匿名消息
    这两天在改以前写的程序,很闷,记点流水账,:)
    我也要回家了,给大家道别
    时间过的好快啊,2月1号了,几个问题!
    开始新的一年的新生活..
    @dudu,临走问一下
  • 原文地址:https://www.cnblogs.com/xiao-xue-di/p/14859101.html
Copyright © 2011-2022 走看看