思路:启动max_workers个workers(线程),每个线程处理一份输入数据。
如果自己统计,那还需要对下边的脚本进行扩展。
如果搭配grafana等监控工具使用,那压测脚本只负责疯狂发请求就好了。
import concurrent.futures
def parallel_process(client_config, inputs, max_workers):
PoolExecutor = concurrent.futures.ThreadPoolExecutor
# 这里也可以改成进程。PoolExecutor = concurrent.futures.ProcessPoolExecutor
with PoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(single_process, *(client_config, inputs)) for _ in range(max_workers)]
# 每个future.result()返回的就是每个single_process的responses。
[future.result() for future in concurrent.futures.as_completed(futures)]
def single_process(client_config, input_file):
# 自己按需实现create_client,比如返回一个http或者rpc client。
client = create_client(client_config)
# 用client的方法处理每个input,比如这个方法叫process。
responses = [ client.process(input) for input in inputs ]
# 如果需要统计latency等,可以在这里写代码统计下。
return responses