zoukankan      html  css  js  c++  java
  • python locust 进行压力测试

    最近公司项目周期比较赶, 项目是软硬结合,在缺少硬件的情况下,通过接口模拟设备上下架和购买情况,并进行压力测试,

    本次主要使用三个接口 分别是3个场景: 生成商品IP, 对商品进行上架, 消费者购买商品

    最大问题:是数据库是用ssh,只能用127.0.0.1去链接数据库,

    试过用ssh链接数据库, 用requests 去跑脚本没有问题,换上locust 就有问题

    最后使用putty作为代理链接 ,有个缺陷就是 链接时效性不强,经常要重新链接putty

    环境:

    win10  mysql  locust python3.7

    1.locust:

    Locust是一个用于可扩展的,分布式的,性能测试的,开源的,用Python编写框架/工具,它非常容易使用,也非常好学。

    它的主要思想就是模拟一群用户将访问你的网站。每个用户的行为由你编写的python代码定义,同时可以从Web界面中实时观察到用户的行为。

    Locust完全是事件驱动的,因此在单台机器上能够支持几千并发用户访问。

    与其它许多基于事件的应用相比,Locust并不使用回调,而是使用gevent,而gevent是基于协程的,可以用同步的方式来编写异步执行的代码。

    每个用户实际上运行在自己的greenlet中。

    2. 安装

    pip  install locust

     安装 pyzmq

     If you intend to run Locust distributed across multiple processes/machines, we recommend you to also install pyzmq.

     如果打算运行Locust 分布在多个进程/机器,需要安装pyzmq.

     通过pip命令安装。

     pip install pyzmq

    3.Locust主要由下面的几个库构成:

    1) gevent

    gevent是一种基于协程的Python网络库,它用到Greenlet提供的,封装了libevent事件循环的高层同步API。

    2) flask

    Python编写的轻量级Web应用框架。

    3) requests

    Python Http库

    4) msgpack-python

    MessagePack是一种快速、紧凑的二进制序列化格式,适用于类似JSON的数据格式。msgpack-python主要提供MessagePack数据序列化及反序列化的方法。

    5) six

    Python2和3兼容库,用来封装Python2和Python3之间的差异性

    6) pyzmq

    pyzmq是zeromq(一种通信队列)的Python绑定,主要用来实现Locust的分布式模式运行

    当我们在安装 Locust 时,它会检测我们当前的 Python 环境是否已经安装了这些库,如果没有安装,它会先把这些库一一装上。并且对这些库版本有要求,有些是必须等于某版本,有些是大于某版本。我们也可以事先把这些库全部按要求装好,再安装Locust时就会快上许多

    4. 脚本解读

    1、创建ScriptTasks()类继承TaskSet类:  用于定义测试业务。
    
    2、创建index()、about()、demo()方法分别表示一个行为,访问http://example.com。用@task() 装饰该方法为一个任务。1、2表示一个Locust实例被挑选执行的权重,数值越大,执行频率越高。在当前ScriptTasks()行为下的三个方法得执行比例为2:1:1
    
    3、WebsiteUser()类: 用于定义模拟用户。
    
    4、task_set :  指向一个定义了的用户行为类。
    
    5、host:   指定被测试应用的URL的地址
    
    6、min_wait :   用户执行任务之间等待时间的下界,单位:毫秒。
    
    7、max_wait :   用户执行任务之间等待时间的上界,单位:毫秒。

    5.执行脚本:

    web/UI 界面:

    if __name__ == '__main__':
        import os
        os.system("locust -f godemo.py --host=http://xx.api.xxxxx.net")

    或者在命令行:

    locust -f godemo.py --host=http:xxx.xxx.xx.net
    
    或者:
    locust -f godemo.py --H=http:xxx.xxx.xx.net

    no-web:

    dos进入Scripts目录下,执行  locust -f ****.py --csv=onetest --host=http://0.0.0.0:0000 --no-web -c10 -r10 -t2 

    (PS:-f 指定运行的py文件的名字,--csv 生成报告的名字,--host 测试的http服务的ip和port,--no-web 不用web启动,-c 设置虚拟用户数,  -r 设置每秒启动虚拟用户数, -t  设置运行时间)

    下面是脚本:

    创建一个方法保存在goconn,从数据库读取数据传入请求:

    import  pymysql
    from sshtunnel import SSHTunnelForwarder
    import random
    
    
    def conn():
    
        #本地通过putty链接数据库,在链接跳板机
        lcDB = pymysql.connect(host="127.0.0.1",port=8807, user="rt",passwd="qwqwqw12",db="go")
        cur = lcDB.cursor()
        #随机生成一个数字,作为查询的结果的结果数
        num = random.randint(0,10)
    
        sql = "select no from goods where operator_id =%s order by rand() limit %s"%(11, num)
        print(sql)
        rfid = []
    
        try:
    
            cur.execute(sql)
            data = cur.fetchall()
    
            for row in data:
                good_no = row[0]
                rfid.append(good_no)
                # print(good_no)
            return rfid
    
    
        except:
            print("Error")
    
        finally:
            #关闭连接
            lcDB.close()

    请求:

    import goconnfrom locust import HttpLocust,TaskSet,task
    
    class goDemo(TaskSet):
    
        def getRfid(self):
    
            gID = goconn.conn()
            data = {
                "goods_no[]": gID,
                "num": 1,
            }
            print("2")
            r = self.client.post("/test/api/XXX", data=data)
            rfid = eval(r.content)["result"]
            print(rfid)
            return rfid
    
        def stock(self,did, oid, rfid):
            data = {
                "device_id": did,
                "operator_id": oid,
                "data[add][]": rfid,
                "data[remove][]": "",
            }
            result = self.client.post("/test/api/XX2", data= data)
            print(result.content)
    
    
        def purchase(self,did,uid,rfid):
    
            data = {
                "device_id":did,
                "user_id":uid,
                "data[]":rfid,
            }
            req = self.client.post("/test/api/XXX3", data = data)
            res =eval(req.content)["result"]
            print(req.content)
    
        @task(1)
        def test_getrfid(self):
            self.getRfid()
    
        @task(3)
        def test_stock(self):
            rfid = self.getRfid()
            self.stock(28,11,rfid)
    
        @task(7)
        def test_purchase(self):
            rfid = self.getRfid()
            self.stock(28,11,rfid)
            self.purchase(28,172,rfid)
    
    
    class WebsiteUser(HttpLocust):
        task_set = goDemo
        min_wait = 3000
        max_wait = 5000

    测试结果:

     Number of users to simulate:设置模拟的用户总数,

    Hatch rate (users spawned/second):每秒启动的虚拟用户数 ,

    Start swarming:执行locust脚本

    UI界面:运行200个, 每秒启动2个用户:

     参考文章:

    http://www.pianshen.com/article/6404330705/

    关于性能好文章:

    https://www.cnblogs.com/botoo/p/7410283.html

    设计locust:

    参数化 ,关联等:

    http://www.cnblogs.com/ailiailan/p/9474973.html

  • 相关阅读:
    新博客第一篇,字符串 Unicode 转义
    C# 泛型方法的类型推断
    一个改进 LRU 算法的缓冲池 update 2013.7.15
    C# 判断类型间能否隐式或强制类型转换,以及开放泛型类型转换 update 2015.02.03
    C# 词法分析器(三)正则表达式
    java面试题(二)
    Map的迭代
    Spring aop切面插入事物回滚
    Log4J的配置
    js中将yyyyMMdd格式的日期转换
  • 原文地址:https://www.cnblogs.com/shoebill/p/10799575.html
Copyright © 2011-2022 走看看