- 前置:线上抽奖系统
- 场景:2000用户,分别请求接口,查看开奖概率是否符合预期
- 思路:单个用户for循环请求,跑了10+min才跑了3分之1,中途还挂了一次。效率太慢了,,考虑多线程实现代码。
import requests import threading def call_post(_number, token=None): header = { 'Content-Type': 'application/x-www-form-urlencoded', 'x-token': token, } data = requests.post('url', headers=header) print('线程' + str(_number) + ':' + str(data.status_code)) data_file = data.json() print(data_file) with open("content.txt", "a") as f: f.write(str(data_file) + " ") def thread(_number, msg): """ :param _number: 每次从列表取的数量 :param msg: 线程的数量 :return: 无 """ token_list = file_token("token.txt", "r") n = 0 while _number * n < len(token_list): a = token_list[_number * n:_number * n + _number] threads = [] for s in range(0, msg): token = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiI' + a[s] t = threading.Thread(target=call_post, args=(s, token)) threads.append(t) n = n + 1 for t in threads: t.start() for t in threads: t.join() if __name__ == '__main__': thread(4, 4)