zoukankan      html  css  js  c++  java
  • asyncio并发编程


    asyncio简介

    asyncio是Python3.4引入的一个用于异步IO的库,其主要功能如下

    1)包含各种特定系统实现的模块化事件循环

    2)传输和协议抽象

    3)对TCP、UDP、SSL、子进程、延时调用以及其他的具体支持

    4)模仿futures模块但适用于事件循环使用的Future类

    5)基于yield from的协议和任务,可以让我们用顺序的方式编写并发代码

    6)必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转移到线程池

    7)模仿threading模块中的同步原语、可以用在单线程内的协程之间


    事件循环

    基本使用

    来看一个示例

    # 异步编程三要素:事件循环+回调(驱动生成器)+IO多路复用
    # asyncio是python用于解决异步IO编程的一整套解决方案
    # 基于asyncio的框架:tornado、gevent、twisted(scrapy,django channels)
    # tornado(实现了web服务器),django+flask
    
    
    # 使用asyncio
    import asyncio
    import time
    
    
    async def get_html(url):
        print("start get url")
        # 注意这里一定不要使用time.sleep(1),因为time.sleep()是同步阻塞的
        await asyncio.sleep(2)	# 耗时操作一定要放到await里面
        # time.sleep(2)
        print("end get url")
    
    
    if __name__ == '__main__':
        start_time = time.time()
        loop = asyncio.get_event_loop()
        tasks = [get_html("http://www.baidu.com") for _ in range(10)]
        # loop.run_until_complete(get_html("http://www.baidu.com"))
        loop.run_until_complete(asyncio.wait(tasks))	# 一次执行多个任务
        print(time.time()-start_time)
    

    ### 获取协程的返回值
    import asyncio
    import time
    
    
    async def get_html(url):
        print("start get url")
        # 注意这里一定不要使用time.sleep(1),因为time.sleep()是同步阻塞的
        await asyncio.sleep(2)
        # time.sleep(2)
        print("end get url")
        return "loop_test"
    
    
    if __name__ == '__main__':
        start_time = time.time()
        loop = asyncio.get_event_loop()
        # ensure_future返回的是一个future对象
        get_future = asyncio.ensure_future(get_html("http://www.baidu.com"))
        loop.run_until_complete(get_future)		# run_until_complete也可以接收一个future对象
        print(get_future.result())	# loop_test
    

    还有一种方式就是通过task对象获取返回值
    import asyncio
    import time
    
    
    async def get_html(url):
        print("start get url")
        await asyncio.sleep(2)
        print("end get url")
        return "loop_test"
    
    
    if __name__ == '__main__':
        start_time = time.time()
        loop = asyncio.get_event_loop()
        task = loop.create_task(get_html("http://www.baidu.com"))
        loop.run_until_complete(task)
        print(get_future.result())	# loop_test
    

    ### 回调

    如果在执行完协程后需要回调,比如说我们需要发送邮件,可以使用task对象的add_done_callback方法

    import asyncio
    import time
    
    
    async def get_html(url):
        print("start get url")
        await asyncio.sleep(2)
        print("end get url")
        return "loop_test"
    
    
    def callback(future):   # 注意这里默认会将future传给回调函数,所以必须有一个参数接收
        print("send email to robin")
    
    
    if __name__ == '__main__':
        start_time = time.time()
        loop = asyncio.get_event_loop()
        task = loop.create_task(get_html("http://www.baidu.com"))
        task.add_done_callback(callback)
        loop.run_until_complete(task)
        print(task.result())
    

    这里有一个问题,如果回调函数需要参数的话就不行了,因为我们在调用的时候只传入了一个函数名,要传入参数,可以使用functool的partial函数
    import time
    from functools import partial
    
    
    async def get_html(url):
        print("start get url")
        # 注意这里一定不要使用time.sleep(1),因为time.sleep()是同步阻塞的
        await asyncio.sleep(2)
        print("end get url")
        return "loop_test"
    
    
    def callback(name, future):   # 注意这里callback自己的参数放在前面,future放在后面
        print("send email to %s" % name)
    
    
    if __name__ == '__main__':
        start_time = time.time()
        loop = asyncio.get_event_loop()
        task = loop.create_task(get_html("http://www.baidu.com"))
        task.add_done_callback(partial(callback, "sansa"))  # 将callback函数包裹起来
        loop.run_until_complete(task)
        print(task.result())
    

    ### wait和gather

    wait用于一次提交多个任务,gather与task在使用上基本相似,区别在于gather传参时需要使用*打散

    下面来具体说一下详细区别

    1)gather更加high-level(抽象层次更高)

    2)gather可以分组


    ```python import asyncio import time from functools import partial

    async def get_html(url):
    print("start get url")
    # 注意这里一定不要使用time.sleep(1),因为time.sleep()是同步阻塞的
    await asyncio.sleep(2)
    print("end get url")
    return "loop_test"

    def callback(name, future): # 注意这里默认会将future传给回调函数,所以必须有一个参数接收
    print("send email to %s" % name)

    if name == 'main':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task1 = [get_html("http://www.baidu.com") for _ in range(5)]
    task2 = [get_html("http://www.baidu.com") for _ in range(5)]
    loop.run_until_complete(asyncio.gather(*task1, *task2)) # 这里需要打散

    
    <br>
    
    后面可以改写成
    
    ```python
    if __name__ == '__main__':
        start_time = time.time()
        loop = asyncio.get_event_loop()
        task1 = [get_html("http://www.baidu.com") for _ in range(5)]
        task2 = [get_html("http://www.baidu.com") for _ in range(5)]
        group1 = asyncio.gather(*task1)
        group2 = asyncio.gather(*task2)
        loop.run_until_complete(asyncio.gather(group1, group2))
    

    ## task取消和子协程调用原理

    task取消

    import asyncio
    import time
    
    
    async def get_html(sleep_times):
        print("waiting")
        await asyncio.sleep(sleep_times)
        print("done after %s s" % sleep_times)
    
    
    if __name__ == '__main__':
        task1 = get_html(2)
        task2 = get_html(3)
        task3 = get_html(3)
    
        tasks = [task1, task2, task3]
        loop = asyncio.get_event_loop()
    
        try:
            loop.run_until_complete(asyncio.wait(tasks))
        except KeyboardInterrupt as e:	# 命令行运行时按ctrl+c终止程序
            all_tasks = asyncio.Task.all_tasks()	# 获取所有task
            for task in all_tasks:
                print("cancel task")
                print(task.cancel())	# 取消成功返回True
            loop.stop()
            loop.run_forever()
        finally:
            loop.close()    # 注意close与stop的区别
    

    ### 子协程

    下面来看一个官方示例

    import asyncio
    
    async def compute(x, y):
        print("Compute %s + %s ..." % (x, y))
        await asyncio.sleep(1.0)
        return x + y
    
    async def print_sum(x, y):
        result = await compute(x, y)
        print("%s + %s = %s" % (x, y, result))
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(print_sum(1, 2))
    loop.close()
    

    执行结果
    Compute 1 + 2 ...
    # 1 s
    1 + 2 = 3
    

    上述协程的时序图如下

    ../_images/tulip_coro.png


    ## 其他方法

    call_soon

    call_soon是立即启动的意思,传入一个函数和函数需要的参数

    import asyncio
    
    def callback(sleep_times):
        print("sleep %s seconds" % sleep_times)
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.call_soon(callback, 2)	
        loop.run_forever()
    

    执行结果
    sleep 2 seconds
    # 程序继续运行中...
    

    我们需要在协程结束后终止事件循环,因此需要再定义一个函数
    import asyncio
    
    def callback(sleep_times):
        print("sleep %s seconds" % sleep_times)
    
    def stoploop(loop):		# 终止循环
        loop.stop()
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.call_soon(callback, 2)     # 第一个参数是函数名,后面为动态参数
        loop.call_soon(stoploop, loop)
        loop.run_forever()
    

    call_later

    call_later按等待时间执行函数,会根据传入的时间排出一个顺序

    import asyncio
    
    
    def callback(sleep_times):
        print("sleep %s seconds" % sleep_times)
    
    
    def stoploop(loop):
        loop.stop()
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.call_later(1, callback, 1)     # 第一个参数是函数名,后面为动态参数
        loop.call_later(3, callback, 3)     # 第一个参数是函数名,后面为动态参数
        loop.call_later(2, callback, 2)     # 第一个参数是函数名,后面为动态参数
        # loop.call_soon(stoploop, loop)	# 这里如果使用call_soon那么loop会先stop,就看不到效果了
        loop.run_forever()
    

    执行结果
    sleep 1 seconds
    sleep 2 seconds
    sleep 3 seconds
    # 程序继续运行中...
    

    ### call_at

    call_at在指定时间执行

    import asyncio
    
    def callback(sleep_times):
        print("sleep %s seconds" % sleep_times)
    
    def stoploop(loop):
        loop.stop()
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        now = loop.time()
        loop.call_at(now+2, callback, 1)     # 第一个参数是函数名,后面为动态参数
        loop.call_at(now+3, callback, 3)     # 第一个参数是函数名,后面为动态参数
        loop.call_at(now+2, callback, 2)     # 第一个参数是函数名,后面为动态参数
        loop.run_forever()
    

    call_soon_threadsafe

    call_soon_threadsafe是线程安全的call_soon,涉及到多线程时,使用这个


    ThreadPollExecutor 和 asyncio完成阻塞io请求

    什么时候使用多线程:在协程中集成阻塞io

    在协程中不要放阻塞的代码, 但如果非要使用阻塞的代码, 就可以放到线程池中运行。

    import asyncio
    from concurrent.futures import ThreadPoolExecutor
    import socket
    from urllib.parse import urlparse
    import time
    
    
    def get_url(url):
        # 通过socket请求html
        url = urlparse(url)
        host = url.netloc
        path = url.path
        if path == "":
            path = "/"
    
        # 建立socket连接
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client.connect((host, 80))
    
        client.send("GET {} HTTP/1.1
    Host:{}
    Connection:close
    
    ".format(path, host).encode('utf8'))
        data = b""
        while True:
            d = client.recv(1024)
            if d:
                data += d
            else:
                break
        data = data.decode("utf8")
        html_data = data.split("
    
    ")[1]   # 把请求头信息去掉,只要网页内容
        print(html_data)
        client.close()
    
    
    if __name__ == '__main__':
        start_time = time.time()
        loop = asyncio.get_event_loop()
        executor = ThreadPoolExecutor(3)    # 线程池
        tasks = []
        for url in range(20):
            url = "http://shop.projectsedu.com/goods/{}/".format(url)
            # 阻塞的代码放到线程池
            task = loop.run_in_executor(executor, get_url, url)
            tasks.append(task)
        loop.run_until_complete(asyncio.wait(tasks))
        print("last time: %s" %(time.time()-start_time))
    

    ## asyncio发送http请求

    asyncio发送http请求可以通过asyncio的open_connection方法实现,open_connection方法返回reader和writer对象,分别用于读和写

    import asyncio
    from urllib.parse import urlparse
    import time
    
    
    async def get_url(url):
        url = urlparse(url)
        host = url.netloc
        path = url.path
        if path == "":
            path = '/'
    
        # 建立socket连接
        reader, writer = await asyncio.open_connection(host, 80)  # 协程 与服务端建立连接
        writer.write(
            "GET {} HTTP/1.1
    Host:{}
    Connection:close
    
    ".format(path, host).encode('utf8'))
        all_lines = []
        async for raw_line in reader:  # __aiter__ __anext__魔法方法
            line = raw_line.decode('utf8')
            all_lines.append(line)
        html = '
    '.join(all_lines)
        return html
    
    
    if __name__ == '__main__':
        import time
        start_time = time.time()
        loop = asyncio.get_event_loop()
        tasks = []
        for url in range(20):
            url = "http://shop.projectsedu.com/goods/{}/".format(url)
            tasks.append(get_url(url))
        loop.run_until_complete(asyncio.wait(tasks))
        print("last time: %s" % (time.time() - start_time))
    

    如果我们需要获取协程执行后的结果,我们可以把future对象放入tasks里面,然后通过future获取result

    修改后半段代码

    if __name__ == '__main__':
        import time
        start_time = time.time()
        loop = asyncio.get_event_loop()
        tasks = []
        for url in range(20):
            url = "http://shop.projectsedu.com/goods/{}/".format(url)
            tasks.append(asyncio.ensure_future(get_url(url)))	# future对象
        loop.run_until_complete(asyncio.wait(tasks))
        for task in tasks:
            print(task.result())	# 通过future对象获取结果
        print("last time: %s" % (time.time() - start_time))
    

    上面的是所有协程都执行完后再获取结果,如果需要执行完一个马上获取结果,可以使用as_completed方法

    import asyncio
    from urllib.parse import urlparse
    import time
    
    
    async def get_url(url):
        url = urlparse(url)
        host = url.netloc
        path = url.path
        if path == "":
            path = '/'
    
        # 建立socket连接
        reader, writer = await asyncio.open_connection(host, 80)  # 协程 与服务端建立连接
        writer.write(
            "GET {} HTTP/1.1
    Host:{}
    Connection:close
    
    ".format(path, host).encode('utf8'))
        all_lines = []
        async for raw_line in reader:  # __aiter__ __anext__魔法方法
            line = raw_line.decode('utf8')
            all_lines.append(line)
        html = '
    '.join(all_lines)
        return html
    
    
    async def main():
        tasks = []
        for url in range(20):
            url = "http://shop.projectsedu.com/goods/{}/".format(url)
            tasks.append(asyncio.ensure_future(get_url(url)))
        for task in asyncio.as_completed(tasks):
            result = await task     # 一定要加await
            print(result)
    
    
    if __name__ == '__main__':
        import time
        start_time = time.time()
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
        print("last time: %s" % (time.time() - start_time))
    

    ## asyncio同步和通信

    同步

    协程一般是不需要锁的

    import asyncio
     
    total = 0 
     
    async def add():
        global total
        for _ in range(1000000):
            total += 1
     
     
    async def desc():
        global total, lock
        for _ in range(1000000):
            total -= 1 
    
     
    if __name__ == '__main__':
        tasks = [add(), desc()]
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait(tasks))
    	print(total)
    

    上面的代码是不需要加锁的,不管运行多少次,结果都为0。但在某些情况下,我们需要加锁来使协程同步
    import asyncio
    import aiohttp
    from asyncio import Lock
    
    
    cache = {"baidu": "http://ww.baidu.com"}
    lock = Lock()   # 这里的Lock不是系统的Lock,还有async for 。。。类似的用法
    
    
    async def get_stuff(url):
        async with lock:    # 等价于with await lock
            # 这里可以使用async with 是因为 Lock中有__await__ 和 __aenter__两个魔法方法
            # 和线程一样, 这里也可以用 await lock.acquire() 并在结束时 lock.release
            if url in cache:
                return cache[url]
            stuff = await aiohttp.request("GET", url)
            cache[url] = stuff
            return stuff
    
    
    async def parse_stuff(url):
        stuff = await get_stuff(url)
        # do some parse
    
    
    async def use_stuff(url):
        stuff = await get_stuff(url)
        # use stuff to do something
    
    
    if __name__ == '__main__':
        tasks = [parse_stuff("baidu"), use_stuff("baidu")]
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait(tasks))
    

    这里parse_stuff和use_stuff有共同调用的代码get_stuff,parse_stuff去请求的时候,如果get_stuff也去请求,可能触发网站的反爬机制


    ### 通信

    因为协程是单线程的,所以协程完全可以使用全局变量实现queue来相互通信,但是如果想要在queue中定义存放的最大数目,那么需要使用asyncio的Queue,同时使用get和put时需要加上await

    from asyncio import Queue
    queue = Queue(maxsize=5)	
    
  • 相关阅读:
    多工作簿引用
    Record.ToTable记录到表Table.FromRecords/Record.To…(Power Query 之 M 语言)
    Sumif矩阵区域条件求和
    计算不重复
    数值到列表(Power Query 之 M 语言)
    Table.FromList列表到表Table.From…(Power Query 之 M 语言)
    图文表排版
    按比例划分等级
    Diocp HTTPServer 支持SVG
    责任声明和转载声明 .
  • 原文地址:https://www.cnblogs.com/zzliu/p/11259452.html
Copyright © 2011-2022 走看看