zoukankan      html  css  js  c++  java
  • asyncio aiohttp限制并发数量和超时时间

    转载:https://www.jianshu.com/p/6f8980cf0948

    主要参考参数设置的一些问题

    import asyncio
    import random
    import traceback
    
    from aiohttp import ClientSession, TCPConnector, client_exceptions
    import time
    
    URL = 'http://127.0.0.1:5000/?delay={}'
    
    
    async def fetch(session, i):
        dly = random.randint(1,8)
        url = URL.format(dly)
        start_time = time.time()
        try:
            async with session.get(url=url) as response:
                r = await response.read()
                end_time = time.time()
                cost = end_time - start_time
                msg = "第{}个查询请求,花费时间: {}s, 返回信息: {}
    ".format(i, cost, r.decode('unicode-escape'))
                print("running %d" % i, msg)
        except client_exceptions.ServerTimeoutError as timeout_error:
            print("request timeout error: {}, url: {}".format(timeout_error, url))
        except Exception as e:
            print("request unknown error: {}".format(traceback.format_exc()))
    
    
    async def chunks(sem, session, i):
        """
        限制并发数
        """
        # 使用Semaphore, 它会在第一批400个请求发出且返回结果(是否等待返回结果取决于你的fetch方法的定义)后
        # 检查本地TCP连接池(最大400个)的空闲数(连接池某个插槽是否空闲,在这里,取决于请求是否返回)
        # 有空闲插槽,就PUT入一个请求并发出(完全不同于Jmeter的rame up in period的线性发起机制).
        # 所以,在结果log里,你会看到第一批请求(开始时间)是同一秒发起,而后面的则完全取决于服务器的吞吐量
        async with sem:
            await fetch(session, i)
    
    
    async def run(num):
        tasks = []
        # Semaphore, 相当于基于服务器的处理速度和测试客户端的硬件条件,一批批的发
        # 直至发送完全部(下面定义的400)
        sem = asyncio.Semaphore(400)
        # 创建session,且对本地的TCP连接做限制limit=400(不做限制limit=0)
        # 超时时间指定
        # total:全部请求最终完成时间
        # connect: aiohttp从本机连接池里取出一个将要进行的请求的时间
        # sock_connect:单个请求连接到服务器的时间
        # sock_read:单个请求从服务器返回的时间
        import aiohttp
        timeout = aiohttp.ClientTimeout(total=330, connect=2, sock_connect=15, sock_read=10)
        async with ClientSession(connector=TCPConnector(limit=400), timeout=timeout) as session:
            for i in range(0, num):
                # 如果是分批的发,就使用并传递Semaphore
                task = asyncio.ensure_future(
                    chunks(sem, session, i))
                tasks.append(task)
            responses = asyncio.gather(*tasks)
            await responses
    
    
    start = time.time()
    number = 380
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(run(number))
    loop.run_until_complete(future)
    end = time.time()
    total = end - start
    with open("log", "a+", encoding="utf-8") as f:
        f.write('总耗时:{}秒,平均速度:{}秒
    '.format(total, total / number))

     更新:

    如果超时,limit=400,验证第一次同时发起400个请求

    import time
    import queue
    import random
    import asyncio
    import traceback
    import collections
    
    from aiohttp import ClientSession, TCPConnector, client_exceptions, ClientTimeout
    
    queue_data = queue.Queue()
    timeout_domains = []
    unknown_error_domains = []
    start_time_list = []
    
    
    async def fetch(session, n, url):
        """
        :param session:  aiohttp.ClientSession
        :param n: task编号
        :param url: 请求url
        """
        start_time = time.time()
        # noinspection PyBroadException
        try:
            async with session.get(url=url, verify_ssl=False) as response:
                r = await response.read()
                end_time = time.time()
                cost = end_time - start_time
                msg = "第{}个查询请求,花费时间: {}s, 返回信息: {}
    ".format(n, cost, r.decode('unicode-escape'))
                # print(msg)
                queue_data.put(1)
        except client_exceptions.ServerTimeoutError as timeout_error:
            print("request timeout error: {}, url: {}".format(timeout_error, url))
            timeout_domains.append(url)
        except Exception:
            print("request unknown error: {}".format(traceback.format_exc()))
            unknown_error_domains.append(url)
        start_time_list.append(str(start_time).split(".")[0])
    
    
    async def chunks(sem, session, i, url):
        """
        限制并发数
        """
        async with sem:
            await fetch(session, i, url)
    
    
    def get_domains():
        urls = []
        for _ in range(1000):
            urls.append("http://127.0.0.1:5000/?delay={}".format(random.randint(1, 8)))
        return urls
    
    
    async def main(urls):
        sem = asyncio.Semaphore(400)
        timeout = ClientTimeout(total=10, connect=2, sock_connect=15, sock_read=5)
        async with ClientSession(connector=TCPConnector(limit=400), timeout=timeout) as session:
            tasks = [asyncio.create_task(chunks(sem, session, index, url)) for index, url in enumerate(urls)]
            await asyncio.wait(tasks)
    
    
    if __name__ == '__main__':
        domains = get_domains()
        asyncio.run(main(domains))
        print("success number: {}, timeout number: {}, unknown_error number: {}".format(queue_data.qsize(),
                                                                                        len(timeout_domains),
                                                                                        len(unknown_error_domains)))
    
        print(sorted(collections.Counter(start_time_list).items(), key=lambda item:item[0]))
        # 1. 没有超时的,第一批400个同一秒发起, 再往后就看response相应与读取速度
        # success number: 1000, timeout number: 0, unknown_error number: 0
        # [('1593246892', 400), ('1593246894', 48), ('1593246895', 55), ('1593246896', 55), ('1593246897', 76),
        #  ('1593246898', 74), ('1593246899', 90), ('1593246900', 96), ('1593246901', 106)]
        # 2. 有超时的
        # success number: 517, timeout number: 483, unknown_error number: 0
        # [('1593248067', 400), ('1593248068', 36), ('1593248069', 43), ('1593248070', 75), ('1593248071', 64),
        # ('1593248072', 168), ('1593248073', 126), ('1593248074', 69), ('1593248075', 19)]
  • 相关阅读:
    编程语言
    MySQL之常用函数
    Java常用工具类
    数据结构
    Java对接SAP平台接口
    Maven项目依赖管理工具
    Java设计模式--抽象工厂
    Java基础--抽象类与接口
    Java集合--ArrayList遍历删除元素
    Java注解(Annotation )--结合拦截器实现是否登录验证
  • 原文地址:https://www.cnblogs.com/zhzhlong/p/13198320.html
Copyright © 2011-2022 走看看