zoukankan      html  css  js  c++  java
  • websocket长连接压力测试踩过的坑

    Websocket协议压测记录

    背景:

        公司的行情系统是采用的websocket协议,有请求和订阅两种方式向服务器申请最新行情信息。请求方式是一次的,订阅方式是建立连接后,服务器定时向客户端推送行情信息。

    初步测试方案:

    因考虑到websocket是双工通讯,是长连接,并且本次压测的性能指标是系统能建立的最大连接数,并且是建立连接后服务器能持续向客户端推送行情信息。

    基于以上原因考虑用python采用多线程建立连接,为了验证能否收到推送的信息,把返回的行情信息保存到文本文件中。Python脚本如下:

    import websocket

    import time

    import threading

    import gzip

    #import json

    #from threadpool import ThreadPool, makeRequests

    #from websocket import create_connection

    SERVER_URL = "ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws"

    #SERVER_URL = "wss://i.cg.net/wi/ws"

    #SERVER_URL = "wss://www.exshell.com/r1/main/ws"

    def on_message(ws, message):

        print(message)

    def on_error(ws, error):

        print(error)

    def on_close(ws):

        print("### closed ###")

    def on_open(ws):

        def send_trhead():

            send_info = '{"sub": "market.ethusdt.kline.1min","id": "id10"}'

            #send_info = '{"event":"subscribe", "channel":"btc_usdt.deep"}'

            while True:

                #time.sleep(5)

                #ws.send(json.dumps(send_info))

                ws.send(send_info)

                while (1):

                    compressData = ws.recv()

                    result = gzip.decompress(compressData).decode('utf-8')

                    if result[:7] == '{"ping"':

                        ts = result[8:21]

                        pong = '{"pong":' + ts + '}'

                        ws.send(pong)

                        ws.send(send_info)

                    else:

                        #print(result)

                        with open('./test_result.txt', 'a') as f:

                            f.write(threading.currentThread().name+' ')

                            f.write(result+' ')

        t = threading.Thread(target=send_trhead)

        t.start()

        print(threading.currentThread().name)

    def on_start(a):

        # time.sleep(2)

        # websocket.enableTrace(True)

        # ws = websocket.WebSocketApp(SERVER_URL,

        #                             on_message=on_message,

        #                             on_error=on_error,

        #                             on_close=on_close)

        # ws.on_open = on_open

        # ws.run_forever()

        #print(a[2])

        try:

            ws = websocket.create_connection(SERVER_URL)

            on_open(ws)

        except Exception as e:

            print('error is :',e)

            print('connect ws error,retry...')

            time.sleep(5)

    if __name__ == "__main__":

        # pool = ThreadPool(3)

        # test = list()

        # for ir in range(3):

        #     test.append(ir)

        #

        # requests = makeRequests(on_start, test)

        # [pool.putRequest(req) for req in requests]

        # pool.wait()

        # # #on_start(1)

        for ir in range(20):

            on_start(1)

            time.sleep(0.1)

    初步测试结果:

        在压测的过程中,发现连接数达到一定程度(单机1400连接),连接就断掉了,监控发现压力机内存基本消耗光了,因建立连接,并接收返回的信息,随着连接数增加,内存消耗大,只能断开连接,释放内存。

    调整测试方案:

        和架构、开发讨论后,准备在websocket客户端采用AIO异步通讯方式增大压力,因当时是考虑到长连接未考虑这种方式,查询资料,发现websocket服务端可以采用AIO异步通讯方式,在websocket客户端尝试一下,采用locust + python的方式,也查找了一些资料,发现方案可行。

        Locust是一款可扩展的,分布式的,性能测试的,开源的,用Python编写的性能测试工具。对于测试HTTP协议的接口是比较方便的,但是它也支持测试别的协议的接口,不过需要重写Locust类。脚本如下:

    from locust import Locust, events, task, TaskSet

    import websocket

    import time

    import gzip

    class WebSocketClient():

         def __init__(self, host, port):

             self.host = host

             self.port = port

    class WebSocketLocust(Locust):

         def __init__(self, *args, **kwargs):

             self.client = WebSocketClient("172.31.15.85", 9503)

    class UserBehavior(TaskSet):

        ws = websocket.WebSocket()

         #self.ws.connect("ws://10.98.64.103:8807")

         ws.connect("ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws")

         @task(1)

         def buy(self):

             try:

                 start_time = time.time()

                 #self.ws.send('{"url":"/buy","data":{"id":"123","issue":"20170822","doubled_num":2}}')

                #result = self.ws.recv()

                 send_info = '{"sub": "market.ethusdt.kline.1min","id": "id10"}'

                 # send_info = '{"event":"subscribe", "channel":"btc_usdt.deep"}'

                 while True:

                     # time.sleep(5)

                    # ws.send(json.dumps(send_info))

                     ws.send(send_info)

                     while (1):

                        compressData = ws.recv()

                        result = gzip.decompress(compressData).decode('utf-8')

                         if result[:7] == '{"ping"':

                            ts = result[8:21]

                             pong = '{"pong":' + ts + '}'

                             ws.send(pong)

                            ws.send(send_info)

                         else:

                             # print(result)

                            with open('./test_result.txt', 'a') as f:

                                 #f.write(threading.currentThread().name + ' ')

                                 f.write(result + ' ')

             except Exception as e:

                 print("error is:",e)

    class ApiUser(WebSocketLocust):

        task_set = UserBehavior

         min_wait = 100

         max_wait = 200

    用命令执行脚本:

    Locust -f websocket_client_locust.py –-no-web -c 1 -r 1 -t 60s

    单个用户执行成功,并能生成文件。但多个用户执行的时候就报错,报错信息如下:This socket is already used by another greenlet: <bound method Waiter.switch of <gevent.hub.Waiter object at 0x7f01f0594900>>

    错误原因说的是socke正在被使用,但是我的代码中是新的socket,简单分析了一下,应该不会出现问题,但是我的socek的使用部分是一个全局的client,然后程序运行的时候出现了上述错误.仔细推测我找出了原因:

    geven是个协程库,那么多个协程共用一个socek的时候就会出现上述错误了,于是我把socket改成了局部的,问题解决.

    修改前:

     

    修改后:

     

    修改后代码:

    from locust import Locust, events, task, TaskSet

    import websocket

    import time

    import gzip

    class WebSocketClient():

         def __init__(self, host):

             self.host = host

             #self.port = port

    class WebSocketLocust(Locust):

         def __init__(self, *args, **kwargs):

             self.client = WebSocketClient("172.31.15.85")

    class UserBehavior(TaskSet):

         # ws = websocket.WebSocket()

        # #self.ws.connect("ws://10.98.64.103:8807")

        # ws.connect("ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws")

         @task(1)

         def buy(self):

             try:

                ws = websocket.WebSocket()

                 # self.ws.connect("ws://10.98.64.103:8807")

                 ws.connect("ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws")

                 start_time = time.time()

                 #self.ws.send('{"url":"/buy","data":{"id":"123","issue":"20170822","doubled_num":2}}')

                #result = self.ws.recv()

                send_info = '{"sub": "market.ethusdt.kline.1min","id": "id10"}'

                 # send_info = '{"event":"subscribe", "channel":"btc_usdt.deep"}'

                 while True:

                     # time.sleep(5)

                    # ws.send(json.dumps(send_info))

                     ws.send(send_info)

                     while (1):

                        compressData = ws.recv()

                        result = gzip.decompress(compressData).decode('utf-8')

                         if result[:7] == '{"ping"':

                            ts = result[8:21]

                             pong = '{"pong":' + ts + '}'

                             ws.send(pong)

                            ws.send(send_info)

                         # else:

                        #     # print(result)

                        #     with open('./test_result.txt', 'a') as f:

                        #         #f.write(threading.currentThread().name + ' ')

                        #         f.write(result + ' ')

             except Exception as e:

                 print("error is:",e)

    class ApiUser(WebSocketLocust):

        task_set = UserBehavior

         min_wait = 100

         max_wait = 200

    压测开始,随着用户数上升,压力机端发生如下错误:500和502错误

     

    这是协议进行握手时失败,查询后端行情应用服务器,也有大量报错。

    查看服务器发现打开最大文件数是1024,调整到65535,用如下命令调整:

    第一步,修改/etc/sysctl.conf文件,在文件中添加如下行:

      net.ipv4.ip_local_port_range = 1024 65000

      这表明将系统对本地端口范围限制设置为1024~65000之间。请注意,本地端口范围的最小值必须大于或等于1024;而端口范围的最大值则应小于或等于65535.修改完后保存此文件。

      第二步,执行sysctl命令:

      [speng@as4 ~]$ sysctl -p

      如果系统没有错误提示,就表明新的本地端口范围设置成功。如果按上述端口范围进行设置,则理论上单独一个进程最多可以同时建立60000多个TCP客户端连接。

    调整完成后复测,发现还是报这个错误,请开发进行定位分析应用程序。

  • 相关阅读:
    12月12日总结
    练习:请用索引取出下面list的指定元素:
    练习:小明身高1.75,体重80.5kg。请根据BMI公式(体重除以身高的平方)帮小明计算他的BMI指数,并根据BMI指数:
    练习:请利用循环依次对list中的每个名字打印出Hello, xxx!:
    练习:学员管理系统
    练习:请修改列表生成式,通过添加if语句保证列表生成式能正确地执行
    CF1067D Computer Game
    高等数学第三章
    CF755G PolandBall and Many Other Balls
    TS泛型工具
  • 原文地址:https://www.cnblogs.com/devtest/p/9966465.html
Copyright © 2011-2022 走看看