zoukankan      html  css  js  c++  java
  • aiohttp

    import aiohttp
    import asyncio
    async def fetch(session, url):
       async with session.get(url) as response:
           return await response.text(), response.status
    async def main():
       async with aiohttp.ClientSession() as session:
           html, status = await fetch(session, 'https://cuiqingcai.com')
           print(f'html: {html[:100]}...')
           print(f'status: {status}')
    if __name__ == '__main__':
       loop = asyncio.get_event_loop()
       loop.run_until_complete(main())

    import aiohttp
    import asyncio
    async def main():
       params = {'name': 'germey', 'age': 25}
       async with aiohttp.ClientSession() as session:
           async with session.get('https://httpbin.org/get', params=params) as response:
               print(await response.text())
    if __name__ == '__main__':
       asyncio.get_event_loop().run_until_complete(main())

    session.post('http://httpbin.org/post', data=b'data')
    session.put('http://httpbin.org/put', data=b'data')
    session.delete('http://httpbin.org/delete')
    session.head('http://httpbin.org/get')
    session.options('http://httpbin.org/get')
    session.patch('http://httpbin.org/patch', data=b'data')

    import aiohttp
    import asyncio
    async def main():
       data = {'name': 'germey', 'age': 25}
       async with aiohttp.ClientSession() as session:
           async with session.post('https://httpbin.org/post', data=data) as response:
               print(await response.text())
    if __name__ == '__main__':
       asyncio.get_event_loop().run_until_complete(main())
    async def main():
       data = {'name': 'germey', 'age': 25}
       async with aiohttp.ClientSession() as session:
           async with session.post('https://httpbin.org/post', json=data) as response:
               print(await response.text())

    import aiohttp
    import asyncio
    async def main():
       data = {'name': 'germey', 'age': 25}
       async with aiohttp.ClientSession() as session:
           async with session.post('https://httpbin.org/post', data=data) as response:
               print('status:', response.status)
               print('headers:', response.headers)
               print('body:', await response.text())
               print('bytes:', await response.read())
               print('json:', await response.json())
    if __name__ == '__main__':
       asyncio.get_event_loop().run_until_complete(main())

    设置超时

    import aiohttp
    import asyncio
    async def main():
       timeout = aiohttp.ClientTimeout(total=1)
       async with aiohttp.ClientSession(timeout=timeout) as session:
           async with session.get('https://httpbin.org/get') as response:
               print('status:', response.status)
    if __name__ == '__main__':
       asyncio.get_event_loop().run_until_complete(main())

    控制并发量

    import asyncio
    import aiohttp
    CONCURRENCY = 5
    URL = 'https://www.baidu.com'
    semaphore = asyncio.Semaphore(CONCURRENCY)
    session = None
    async def scrape_api():
       async with semaphore:
           print('scraping', URL)
           async with session.get(URL) as response:
               await asyncio.sleep(1)
               return await response.text()
    async def main():
       global session
       session = aiohttp.ClientSession()
       scrape_index_tasks = [asyncio.ensure_future(scrape_api()) for _ in range(10000)]
       await asyncio.gather(*scrape_index_tasks)
    if __name__ == '__main__':
       asyncio.get_event_loop().run_until_complete(main())
  • 相关阅读:
    Java面向对象(02)--封装
    Java面向对象(01)--初识
    Java基础(10)--数组
    Java基础(09)--方法
    python中format输出常用的3种格式
    python 查找列表中重复元素以及重复元素的次数
    HttpRunner六:创建run.py文件,执行套件并生成测试报告
    HttpRunner五:关联参数的应用,获取上一个接口的返回值,用于当前接口的请求值
    HttpRunner四:testcases、testsuites以及参数化的使用
    HttpRunner中在case2中,使用作为请求参数和预期结果,预期结果中值显示是:LazyString($变量key)
  • 原文地址:https://www.cnblogs.com/angdh/p/14774827.html
Copyright © 2011-2022 走看看