由于人工智能的热度, python目前已经成为最受欢迎的编程语言,一度已经超越Java 。
本文将介绍开源的python 测试工具: locust
使用步骤:
1. 安装python 3.0以上版本
2. 安装Pip
3. 安装locust pip install locustio (windows系统下)
4. 阅读或者下载 locust 源码
一、Locust 的基本实现原理
服务端性能测试工具最核心的部分是压力发生器,核心要点有两个,一是真实模拟用户操作,二是模拟有效并发。
在Locust
测试框架中,测试场景是采用纯Python脚本。对于最常见的HTTP(S)
协议的系统,Locust
采用Python的requests
库作为客户端,而对于其它协议类型的系统,Locust
也提供了接口,只要我们能采用Python编写对应的请求客户端,就能方便地采用Locust
实现压力测试。从这个角度来说,Locust
可以用于压测任意类型的系统。
在模拟有效并发方面,Locust
的优势在于其摒弃了进程和线程,完全基于事件驱动,使用gevent
提供的非阻塞IO
和coroutine
来实现网络层的并发请求,因此即使是单台压力机也能产生数千并发请求数;再加上对分布式运行的支持,理论上来说,Locust
能在使用较少压力机的前提下支持极高并发数的测试。
二、 Locust 脚本编写
首先分析下官方demo脚本:
import random from locust import HttpLocust, TaskSet, task from pyquery import PyQuery class BrowseDocumentation(TaskSet): def on_start(self): # assume all users arrive at the index page self.index_page() self.urls_on_current_page = self.toc_urls @task(10) def index_page(self): r = self.client.get("/") pq = PyQuery(r.content) link_elements = pq(".toctree-wrapper a.internal") self.toc_urls = [ l.attrib["href"] for l in link_elements ] @task(50) def load_page(self, url=None): url = random.choice(self.toc_urls) r = self.client.get(url) pq = PyQuery(r.content) link_elements = pq("a.internal") self.urls_on_current_page = [ l.attrib["href"] for l in link_elements ] @task(30) def load_sub_page(self): url = random.choice(self.urls_on_current_page) r = self.client.get(url) class AwesomeUser(HttpLocust): task_set = BrowseDocumentation host = "http://docs.locust.io/en/latest/" # we assume someone who is browsing the Locust docs, # generally has a quite long waiting time (between # 20 and 600 seconds), since there's a bunch of text # on each page min_wait = 20 * 1000 max_wait = 600 * 1000
在这个示例中,定义了针对host=http://docs.locust.io/en/latest/ 网站的测试场景:先模拟用户登录系统,然后随机地访问首页(/
)和关于页面(/about/
),请求比例为2:1
;并且,在测试过程中,两次请求的间隔时间为20~600
秒间的随机值。
那么,如上Python脚本是如何表达出以上测试场景的呢?
从脚本中可以看出,脚本主要包含两个类,一个是WebsiteUser
(继承自HttpLocust
,而HttpLocust
继承自Locust
),另一个是WebsiteTasks
(继承自TaskSet
)。事实上,在Locust
的测试脚本中,所有业务测试场景都是在Locust
和TaskSet
两个类的继承子类中进行描述的。
Locust类
简单地说,Locust类
就好比是一群蝗虫,而每一只蝗虫就是一个类的实例。
相应的,TaskSet类
就好比是蝗虫的大脑,控制着蝗虫的具体行为,即实际业务场景测试对应的任务集。
在Locust类
中,具有一个client
属性,它对应着虚拟用户作为客户端所具备的请求能力,也就是我们常说的请求方法。
对于常见的HTTP(S)
协议,Locust
已经实现了HttpLocust
类,其client
属性绑定了HttpSession
类,而HttpSession
又继承自requests.Session
。因此在测试HTTP(S)
的Locust脚本
中,我们可以通过client
属性来使用Python requests
库的所有方法,包括GET/POST/HEAD/PUT/DELETE/PATCH
等,调用方式也与requests
完全一致。另外,由于requests.Session
的使用,因此client
的方法调用之间就自动具有了状态记忆的功能。常见的场景就是,在登录系统后可以维持登录状态的Session
,从而后续HTTP请求操作都能带上登录态。
而对于HTTP(S)
以外的协议,我们同样可以使用Locust
进行测试,只是需要我们自行实现客户端。在客户端的具体实现上,可通过注册事件的方式,在请求成功时触发events.request_success
,在请求失败时触发events.request_failure
即可。然后创建一个继承自Locust类
的类,对其设置一个client
属性并与我们实现的客户端进行绑定。后续,我们就可以像使用HttpLocust类
一样,测试其它协议类型的系统。
原理就是这样简单!
在Locust类
中,除了client
属性,还有几个属性需要关注下:
task_set
: 指向一个TaskSet
类,TaskSet
类定义了用户的任务信息,该属性为必填;max_wait/min_wait
: 每个用户执行两个任务间隔时间的上下限(毫秒),具体数值在上下限中随机取值,若不指定则默认间隔时间固定为1秒;host
:被测系统的host,当在终端中启动locust
时没有指定--host
参数时才会用到;weight
:同时运行多个Locust类
时会用到,用于控制不同类型任务的执行权重。
测试开始后,每个虚拟用户(Locust实例
)的运行逻辑都会遵循如下规律:
- 先执行
WebsiteTasks
中的on_start
(只执行一次),作为初始化; - 从
WebsiteTasks
中随机挑选(如果定义了任务间的权重关系,那么就是按照权重关系随机挑选)一个任务执行; - 根据
Locust类
中min_wait
和max_wait
定义的间隔时间范围(如果TaskSet类
中也定义了min_wait
或者max_wait
,以TaskSet
中的优先),在时间范围中随机取一个值,休眠等待; - 重复
2~3
步骤,直至测试任务终止。
TaskSet类
性能测试工具要模拟用户的业务操作,就需要通过脚本模拟用户的行为。在前面的比喻中说到,TaskSet类
好比蝗虫的大脑,控制着蝗虫的具体行为。
具体地,TaskSet类
实现了虚拟用户所执行任务的调度算法,包括规划任务执行顺序(schedule_task
)、挑选下一个任务(execute_next_task
)、执行任务(execute_task
)、休眠等待(wait
)、中断控制(interrupt
)等等。在此基础上,我们就可以在TaskSet
子类中采用非常简洁的方式来描述虚拟用户的业务测试场景,对虚拟用户的所有行为(任务)进行组织和描述,并可以对不同任务的权重进行配置。
在TaskSet
子类中定义任务信息时,可以采取两种方式,@task装饰器
和tasks属性
。
采用@task装饰器
定义任务信息时,描述形式如下:
from locust import TaskSet, task class UserBehavior(TaskSet): @task(1) def test_job1(self): self.client.get('/job1') @task(2) def test_job2(self): self.client.get('/job2')
采用tasks属性
定义任务信息时,描述形式如下:
from locust import TaskSet def test_job1(obj): obj.client.get('/job1') def test_job2(obj): obj.client.get('/job2') class UserBehavior(TaskSet): tasks = {test_job1:1, test_job2:2} # tasks = [(test_job1,1), (test_job1,2)] # 两种方式等价
Locust 用例高级用法
关联
在某些请求中,需要携带之前从Server端返回的参数,因此在构造请求时需要先从之前的Response中提取出所需的参数。
from lxml import etree from locust import TaskSet, task, HttpLocust class UserBehavior(TaskSet): @staticmethod def get_session(html): tree = etree.HTML(html) return tree.xpath("//div[@class='btnbox']/input[@name='session']/@value")[0] @task(10) def test_login(self): html = self.client.get('/login').text username = 'user@compay.com' password = '123456' session = self.get_session(html) payload = { 'username': username, 'password': password, 'session': session } self.client.post('/login', data=payload) class WebsiteUser(HttpLocust): host = 'http://debugtalk.com' task_set = UserBehavior min_wait = 1000 max_wait = 3000
参数化
循环取数据,数据可重复使用
所有并发虚拟用户共享同一份测试数据,各虚拟用户在数据列表中循环取值。
例如,模拟3用户并发请求网页,总共有100个URL地址,每个虚拟用户都会依次循环加载这100个URL地址;加载示例如下表所示。
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
from locust import TaskSet, task, HttpLocust class UserBehavior(TaskSet): def on_start(self): self.index = 0 @task def test_visit(self): url = self.locust.share_data[self.index] print('visit url: %s' % url) self.index = (self.index + 1) % len(self.locust.share_data) self.client.get(url) class WebsiteUser(HttpLocust): host = 'http://debugtalk.com' task_set = UserBehavior share_data = ['url1', 'url2', 'url3', 'url4', 'url5'] min_wait = 1000 max_wait = 3000
保证并发测试数据唯一性,不循环取数据
所有并发虚拟用户共享同一份测试数据,并且保证虚拟用户使用的数据不重复。
例如,模拟3用户并发注册账号,总共有9个账号,要求注册账号不重复,注册完毕后结束测试;加载示例如下表所示。
from locust import TaskSet, task, HttpLocust import queue class UserBehavior(TaskSet): @task def test_register(self): try: data = self.locust.user_data_queue.get() except queue.Empty: print('account data run out, test ended.') exit(0) print('register with user: {}, pwd: {}' .format(data['username'], data['password'])) payload = { 'username': data['username'], 'password': data['password'] } self.client.post('/register', data=payload) class WebsiteUser(HttpLocust): host = 'http://debugtalk.com' task_set = UserBehavior user_data_queue = queue.Queue() for index in range(100): data = { "username": "test%04d" % index, "password": "pwd%04d" % index, "email": "test%04d@debugtalk.test" % index, "phone": "186%08d" % index, } user_data_queue.put_nowait(data) min_wait = 1000 max_wait = 3000
保证并发测试数据唯一性,循环取数据
所有并发虚拟用户共享同一份测试数据,保证并发虚拟用户使用的数据不重复,并且数据可循环重复使用。
例如,模拟3用户并发登录账号,总共有9个账号,要求并发登录账号不相同,但数据可循环使用;加载示例如下表所示。
from locust import TaskSet, task, HttpLocust import queue class UserBehavior(TaskSet): @task def test_register(self): try: data = self.locust.user_data_queue.get() except queue.Empty: print('account data run out, test ended.') exit(0) print('register with user: {}, pwd: {}' .format(data['username'], data['password'])) payload = { 'username': data['username'], 'password': data['password'] } self.client.post('/register', data=payload) self.locust.user_data_queue.put_nowait(data) class WebsiteUser(HttpLocust): host = 'http://debugtalk.com' task_set = UserBehavior user_data_queue = queue.Queue() for index in range(100): data = { "username": "test%04d" % index, "password": "pwd%04d" % index, "email": "test%04d@debugtalk.test" % index, "phone": "186%08d" % index, } user_data_queue.put_nowait(data) min_wait = 1000 max_wait = 3000
三、Locust运行模式
运行Locust
时,通常会使用到两种运行模式:单进程运行和多进程分布式运行。
单进程运行模式
Locust
所有的虚拟并发用户均运行在单个Python
进程中,具体从使用形式上,又分为no_web
和web
两种形式。该种模式由于单进程的原因,并不能完全发挥压力机所有处理器的能力,因此主要用于调试脚本和小并发压测的情况。
当并发压力要求较高时,就需要用到Locust
的多进程分布式运行模式。从字面意思上看,大家可能第一反应就是多台压力机同时运行,每台压力机分担负载一部分的压力生成。的确,Locust
支持任意多台压力机(一主多从)的分布式运行模式,但这里说到的多进程分布式运行模式还有另外一种情况,就是在同一台压力机上开启多个slave
的情况。这是因为当前阶段大多数计算机的CPU都是多处理器(multiple processor cores
),单进程运行模式下只能用到一个处理器的能力,而通过在一台压力机上运行多个slave
,就能调用多个处理器的能力了。比较好的做法是,如果一台压力机有N
个处理器内核,那么就在这台压力机上启动一个master
,N
个slave
。当然,我们也可以启动N
的倍数个slave
,但是根据我的试验数据,效果跟N
个差不多,因此只需要启动N
个slave
即可。
Locust
是通过在Terminal
中执行命令进行启动的,通用的参数有如下几个:
-H, --host
:被测系统的host
,若在Terminal
中不进行指定,就需要在Locust
子类中通过host
参数进行指定;--no-web
参数,指定并发数(-c
)和总执行次数(-n
)-f, --locustfile
:指定执行的Locust
脚本文件;
在此基础上,当我们想要调试Locust
脚本时,就可以在脚本中需要调试的地方通过print
打印日志,然后将并发数和总执行次数都指定为1
$ locust -f locustfile.py --no-web -c 1 -n 1
no_web
如果采用no_web
形式,则需使用--no-web
参数,并会用到如下几个参数。
-c, --clients
:指定并发用户数;-n, --num-request
:指定总执行测试次数;-r, --hatch-rate
:指定并发加压速率,默认值位1。
示例:
$ locust -H http://debugtalk.com -f demo.py --no-web -c 1 -n 2
web
如果采用web
形式,,则通常情况下无需指定其它额外参数,Locust
默认采用8089
端口启动web
;如果要使用其它端口,就可以使用如下参数进行指定。
-P, --port
:指定web端口,默认为8089
.
$ locust -H http://XXXX.com -f demo.py
如果Locust
运行在本机,在浏览器中访问http://localhost:8089
即可进入Locust
的Web管理页面;如果Locust
运行在其它机器上,那么在浏览器中访问http://locust_machine_ip:8089
即可。
在Locust
的Web管理页面中,需要配置的参数只有两个:
Number of users to simulate
: 设置并发用户数,对应中no_web
模式的-c, --clients
参数;Hatch rate (users spawned/second)
: 启动虚拟用户的速率,对应着no_web
模式的-r, --hatch-rate
参数,默认为1。
多进程分布式运行
不管是单机多进程
,还是多机负载
模式,运行方式都是一样的,都是先运行一个master
,再启动多个slave
。
启动master
时,需要使用--master
参数;同样的,如果要使用8089
以外的端口,还需要使用-P, --port
参数。
$ locust -H http://xxxx.com -f demo.py --master --port=8088
master
启动后,还需要启动slave
才能执行测试任务。
启动slave
时需要使用--slave
参数;在slave
中,就不需要再指定端口了。
$ locust -H http://xxxx.com -f demo.py --slave
如果slave
与master
不在同一台机器上,还需要通过--master-host
参数再指定master
的IP地址。
$ locust -H http://xxxx.com -f demo.py --slave --master-host=<locust_machine_ip>
master
和slave
都启动完毕后,就可以在浏览器中通过http://locust_machine_ip:8089
进入Locust
的Web管理页面了。使用方式跟单进程web
形式完全相同,只是此时是通过多进程负载来生成并发压力,在web
管理界面中也能看到实际的slave
数量。
注意:
locust虽然使用方便,但是加压性能和响应时间上面还是有差距的,如果项目有非常大的并发加压请求,可以选择wrk
对比方法与结果:
可以准备两台服务器,服务器A作为施压方,服务器B作为承压方
服务器B上简单的运行一个nginx服务就行了
服务器A上可以安装一些常用的压测工具,比如locust、ab、wrk
我当时测下来,施压能力上 wrk > golang >> ab > locust
因为locust一个进程只使用一核CPU,所以用locust压测时,必须使用主从分布式(zeromq通讯)模式,并根据服务器CPU核数来起slave节点数
wrk约为55K QPS
golang net/http 约 45K QPS
ab 大约 15K QPS
locust 最差,而且response time明显比较长