from locust import TaskSet,task,HttpLocust from common.redisCon import redis_clusters import queue class register(TaskSet): @task def register(self): data = {} data['name'] = '龙雄' data['idcard'] = '430922199825685857' data['carnum'] = '湘A7X72J' data['pwd'] = 'bcb15f821479b4d5772bd0ca866c00ad5f926e3580720659cc80d39c9d09802a' self.data = data try: phone = self.locust.telqueue.get() except: print("no data exist") exit(0) header = { "Accept": "application/json, text/plain, */*" } json = {"address": {"province": "陕西省", "country": "陕西省西安市碑林区雁塔北路4号靠近陕西工艺美术馆(金都国际大厦)", "city": "西安市"}, "password": self.data['pwd'], "vehicleType": "{"isCertificate":"有证","isCold":"","isElectric":"","isTailBoard":"带尾板","length":"9.6米","name":"厢式货车"}", "registerChannel": "android", "userType": "driver", "usualRunArea": "陕西省,西安市,碑林区", "name": self.data['name'], "phone": phone, "verifyCode": '888888', "plateNum": self.data['carnum'], "idcardNum": self.data['idcard'] } r = self.client.post('/driver/security/regist',headers=header,json=json,verify=False) print("=========================================================================================") print(json) print(r.text) assert r.status_code == 200 class test_run(HttpLocust): host = 'http://192.168.xx.xx' task_set = register phonelist = list(str(18800002000 + i) for i in range(0, 100)) redis_clusters(int(phonelist[0]), int(phonelist[99])) # print(phonelist) telqueue = queue.Queue() for i in phonelist: telqueue.put_nowait(i) if __name__ == "__main__": import os os.system("locust -f register.py")
1、运行该程序;
2、访问localhost:8089,设置虚拟用户数,与每秒用户并发数,确定后运行
3、locust会自动生成测试报告