zoukankan      html  css  js  c++  java
  • 针对 WebSocket 协议的 Locust 压测脚本实现(基于 Locust 1.0 以上版本)

    Locust 默认支持 HTTP 协议(默认通过 HttpUser 类),我们也可以自行实现任意协议的 Client 对它 User 类进行继承(HttpUser 也是继承自 User)并增加所需要的方法,这样也就实现了任意协议的压测。

    针对 WebSocket 协议的 Locust 压测脚本实现无非就是三个步骤

    1. 编写一个 WebSocket Client,也就是定义一个 Class,实现 WS连接初始化、事件订阅、消息接收 所需要的方法
    2. 使用 WebSocket Client 继承 User 类,产生 WebsocketUser
    3. 依据测试用例编写压测脚本,使用 WebsocketUser内预定义的方法 实现并发的连接、事件订阅、消息接收

    脚本实现参考

    from locust import User, task, events, constant
    import time
    import websocket
    import ssl
    import json
    import jsonpath
    
    def eventType_success(eventType, recvText, total_time):
        events.request_success.fire(request_type="[RECV]",
                                    name=eventType,
                                    response_time=total_time,
                                    response_length=len(recvText))
    
    class WebSocketClient(object):
        
        _locust_environment = None
        
        def __init__(self, host):
            self.host = host
            # 针对 WSS 关闭 SSL 校验警报
            self.ws = websocket.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})
            
        def connect(self, burl):
            start_time = time.time()
            try:
                self.conn = self.ws.connect(url=burl)
            except websocket.WebSocketConnectionClosedException as e:
                total_time = int((time.time() - start_time) * 1000)
                events.request_failure.fire(
                    request_type="[Connect]", name='Connection is already closed', response_time=total_time, exception=e)
            except websocket.WebSocketTimeoutException as e:
                total_time = int((time.time() - start_time) * 1000)
                events.request_failure.fire(
                    request_type="[Connect]", name='TimeOut', response_time=total_time, exception=e)
            else:
                total_time = int((time.time() - start_time) * 1000)
                events.request_success.fire(
                    request_type="[Connect]", name='WebSocket', response_time=total_time, response_length=0)
            return self.conn
            
        def recv(self):
            return self.ws.recv()
            
        def send(self, msg):
            self.ws.send(msg)
            
    class WebsocketUser(User):
        abstract = True
        def __init__(self, *args, **kwargs):
            super(WebsocketUser, self).__init__(*args, **kwargs)
            self.client = WebSocketClient(self.host)
            self.client._locust_environment = self.environment
            
    class ApiUser(WebsocketUser):
        host = "wss://ws.xxxxx.com/"
        wait_time = constant(0)
        
        @task(1)
        def pft(self):
            # wss 地址
            self.url = 'wss://ws.xxxxx.com/ws?appid=futures&uid=10000000'
            self.data = {}
            self.client.connect(self.url)
            
            # 发送的订阅请求
            sendMsg = '{"appid":"futures","cover":0,"event":[
                {"type":"exchange_rate","toggle":1,"expireTime":86400},
                {"type":"accountInfo_USDT","toggle":1,"expireTime":86400},
                {"type":"ticker_BTC/USDT","toggle":1,"expireTime":86400}]}'
            self.client.send(sendMsg)
            
            while True:
                # 消息接收计时
                start_time = time.time()
                recv = self.client.recv()
                total_time = int((time.time() - start_time) * 1000)
                
                # 为每个推送过来的事件进行归类和独立计算性能指标
                try:
                    recv_j = json.loads(recv)
                    eventType_s = jsonpath.jsonpath(recv_j, expr='$.eventType')
                    eventType_success(eventType_s[0], recv, total_time)
                except websocket.WebSocketConnectionClosedException as e:
                    events.request_failure.fire(request_type="[ERROR] WebSocketConnectionClosedException",
                                                name='Connection is already closed.',
                                                response_time=total_time,
                                                exception=e)
                except:
                    print(recv)
                    # 正常 OK 响应,或者其它心跳响应加入进来避免当作异常处理
                    if 'ok' in recv:
                        eventType_success('ok', 'ok', total_time)
    
    

    class WebSocketClient

    1. 实现了 WebSocket 的所有行为方法,包括连接初始化、消息发送(订阅)、消息接收
    2. 对连接过程中的异常进行捕获统计,记录异常响应的时间,便于后续测试分析
    3. 这段脚本基本拷贝就能用:)

    class WebsocketUser

    1. 继承 Locust 的 user 成为 WebsocketUser

    class ApiUser

    1. 在这里加载 WebsocketUser,初始化的 user,发送订阅请求、并在一个死循环内接收消息推送内容
    2. 对接收的消息内容(json格式)进行解析,最终可以在 WEB UI 看到各种推送事件的推送统计
    3. 对接收推送过程中的异常进行捕获,记录异常响应的时间,便于后续测试分析

    也可以在死循环内加入心跳发送,但建议建议按照规则发送,避免过于频繁,此处略

    压测过程

  • 相关阅读:
    记录一些经常被忽略的结论
    Eclipse 各种问题解决记录
    Feign 动态URL 解决记录
    Nacos 启动失败
    多git账号配置解决方案
    记一次java.lang.StackOverflowError
    StringBuilder 以及 StringBuffer默认大小与扩容
    MySQL索引背后的数据结构及原理
    我没有高并发项目经验,但是面试的时候经常被问到高并发、性能调优方面的问题,有什么办法可以解决吗?
    istio 学习之 手动注入sidecar
  • 原文地址:https://www.cnblogs.com/huanghaopeng/p/13187807.html
Copyright © 2011-2022 走看看