zoukankan      html  css  js  c++  java
  • python接口多线程压测

      工作中需要,编写一个python脚本放置在服务器上进行压测,压测利用多线程。第一次使用不是很理解,有不对的地方请指教。

      1 #!/usr/bin/env python
      2 # -*- coding:utf-8 -*-
      3 
      4 import requests, time, threading, random,os,codecs
      5 
      6 
      7 class Presstest(object):
      8     headers = {
      9         'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36',
     10         'Content-Type': 'application/json; charset=UTF-8',
     11     }
     12 
     13     def __init__(self, press_url):
     14         self.press_url = press_url
     15         self.headers = self.headers
     16 
     17 
     18     def testinterface(self):
     19         '''压测接口'''
     20         data = {
     21             "taskDesc": "000000",
     22             "timeRange": [
     23                 {
     24                     "startTime": "2021-01-05 15:00:00",
     25                     "endTime": "2021-12-21 23:59:59"
     26                 }
     27             ],
     28             "processId": 1,
     29             "repeatTimes": 1,
     30             "jsonData": [
     31                 {
     32                     "phoneNum1": str(140)+str(random.randint(10000000,99999999)),
     33                     "phoneNum2": "",
     34                     "phoneNum3": "",
     35                     "userData": "{"信用等级":"D6","地址":"01街","资金":"100","欠费月份":"2019-10-01"}",
     36                     "phoneNum4": "",
     37                     "phoneNum5": "",
     38                     "custId": "we_" + str(codecs.encode(os.urandom(8), 'hex').decode()),
     39                     "username": "小范"
     40                 }
     41             ],
     42             "projectSn": "1048",
     43             "requestId": "wek_e00000004",
     44             "taskName": "we_e00000004",
     45             "enterpriseID": "200000110",
     46             "token": "C7AA14C74225CF84918F5B2DF8BBD4FB"
     47         }
     48         print(data)
     49         global ERROR_NUM
     50         try:
     51             reponse = requests.post(self.press_url, headers=self.headers,json=data)
     52             result = reponse.json()
     53             if result["code"] != 0:
     54                 print(result)
     55                 ERROR_NUM += 1
     56         except Exception as e:
     57             print(e)
     58             ERROR_NUM += 1
     59 
     60     def testonework(self):
     61         '''一次并发处理单个任务'''
     62         i = 0
     63         while i < ONE_WORKER_NUM:
     64             i += 1
     65             self.testinterface()
     66         time.sleep(LOOP_SLEEP)
     67 
     68     def run(self):
     69         '''使用多线程进程并发测试'''
     70         t1 = time.time()
     71         Threads = []
     72 
     73         for i in range(THREAD_NUM):
     74             t = threading.Thread(target=self.testonework, name="T" + str(i))
     75             t.setDaemon(True)
     76             Threads.append(t)
     77 
     78         for t in Threads:
     79             t.start()
     80         for t in Threads:
     81             t.join()
     82         t2 = time.time()
     83 
     84         print("===============压测结果===================")
     85         print("URL:", self.press_url)
     86         print("任务数量:", THREAD_NUM, "*", ONE_WORKER_NUM, "=", THREAD_NUM * ONE_WORKER_NUM)
     87         print("总耗时(秒):", t2 - t1)
     88         print("每次请求耗时(秒):", (t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM))
     89         print("每秒承载请求数:", 1 / ((t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM)))
     90         print("错误数量:", ERROR_NUM)
     91 
     92 
     93 if __name__ == '__main__':
     94     press_url = 'http://127.0.0.1:6005/xxxx/xxx/xxxxx'
     95 
     96     THREAD_NUM = 2  # 并发线程总数
     97     ONE_WORKER_NUM = 5  # 每个线程的循环次数
     98     LOOP_SLEEP = 0.01  # 每次请求时间间隔(秒)
     99     ERROR_NUM = 0  # 出错数
    100 
    101     obj = Presstest(press_url=press_url)
    102     obj.run()
  • 相关阅读:
    个人总结
    第十六周学习进度条
    第二阶段冲刺第十天
    第二阶段冲刺第九天
    分层体系架构模式
    安卓语音识别
    AndroidStudio中导入jar包的方法
    Android中控件之间添加分割线
    按照分层设计理念,完成《XXX需求征集系统》的概念结构设计
    在Eclipse下搭建Hibernate框架(加载hibernate工具插件,离线)
  • 原文地址:https://www.cnblogs.com/mtfan01/p/14628320.html
Copyright © 2011-2022 走看看