zoukankan      html  css  js  c++  java
  • locust websocket 测试购买demo

    from locust import User, task, events, constant
    import time,requests
    import websocket
    import ssl
    import json
    import jsonpath
    import json
    
    
    def eventType_success(eventType, recvText, total_time):
        events.request_success.fire(request_type="[RECV]",
                                    name=eventType,
                                    response_time=total_time,
                                    response_length=len(recvText))
    
    
    class WebSocketClient(object):
        _locust_environment = None
    
        def __init__(self, host):
            self.host = host
            # 针对 WSS 关闭 SSL 校验警报
            self.ws = websocket.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})
    
        def connect(self, burl):
            start_time = time.time()
            try:
                self.conn = self.ws.connect(url=burl)
            except websocket.WebSocketConnectionClosedException as e:
                total_time = int((time.time() - start_time) * 1000)
                events.request_failure.fire(
                    request_type="[Connect]", name='Connection is already closed', response_time=total_time, exception=e)
            except websocket.WebSocketTimeoutException as e:
                total_time = int((time.time() - start_time) * 1000)
                events.request_failure.fire(
                    request_type="[Connect]", name='TimeOut', response_time=total_time, exception=e)
            else:
                total_time = int((time.time() - start_time) * 1000)
                events.request_success.fire(
                    request_type="[Connect]", name='WebSocket', response_time=total_time, response_length=0)
            return self.conn
    
        def recv(self):
            return self.ws.recv()
    
        def send(self, msg):
            self.ws.send(msg)
    
    
    class WebsocketUser(User):
        abstract = True
    
        def __init__(self, *args, **kwargs):
            super(WebsocketUser, self).__init__(*args, **kwargs)
            self.client = WebSocketClient(self.host)
            self.client._locust_environment = self.environment
    
    
    def getid():
        id = requests.get('http://192.168.0.153:8000/api/user/Getuserid/').json()
        # print("------------------", id, )
        # print("*------------",id)
        return id["data"]
    
    
    def getuserid(userid):
        token = requests.get('http://192.168.0.153:8000/api/user/GetRoboToken/',
                             params={"id": userid, "level": 5}).json()
        # print("*-*/-*",token)
        return token["data"]
    
    
    class ApiUser(WebsocketUser):
        host = "wss://ws.xxxxx.com/"
        wait_time = constant(0)
    
        @task()
        def ws(self):
            # wss 地址
            userid = getid()
            token = getuserid(userid)
    
            self.url = 'ws://47.243.58.228:8000/api/Primarymarket/ws?token='+token
    
            self.data = {}
            self.client.connect(self.url)
    
            # 发送的订阅请求
            sendMsg = '{"Number":1,"Cardcaseid":1}'
            self.client.send(sendMsg)
  • 相关阅读:
    FirstThunk
    FirstThunk
    输入地址表(IAT)
    PE文件讲解
    PE格式详细讲解
    pe结构讲解
    PE格式详细讲解
    输入地址表(IAT)
    pe结构讲解
    zindex可以使用负值
  • 原文地址:https://www.cnblogs.com/Sunbreaker/p/15117355.html
Copyright © 2011-2022 走看看