zoukankan      html  css  js  c++  java
  • [spring 并行4]异步

    异步篇 1

    介绍 2

    除了线性并行执行模式外,还有异步模式,它与事件编程一样,十分重要
    在并发的异步模式中,不同的任务在时间线上是相互交错的,而且一切都是在单一控制流(单线程)下进行的


    1.asyncio (过时)

    基本使用

    1.1 使用asyncio实现事件循环管理

    什么是事件循环?
    在计算系统中,能够产生事件的实体被称为事件源(event source),而负责协商管理事件的实体被称为事件处理器(event handler)
    它实现了管理计算代码中所有事件的功能:在程序执行期间事件循环不断周期反复,追踪某个数据内部发生的事件,将其纳入队列,如果主线程空闲则调用事件处理器一个一个地处理这些事件

    :事件循环不能使用@asyncio.coroutine标为协程

    示例1:
    延迟3秒后执行

    import asyncio
    import time
    
    def A(x):
        print(x)
        time.sleep(1)   # 使用run_forever()不能用ayncio.sleep()延时
        loop.call_soon(B)
        print('c')
    
    def B():
        print('b')
        loop.stop()
    
    
    loop = asyncio.get_event_loop()
    # loop.call_soon(A, 'a')
    loop.call_later(3.0, A, 'a')
    loop.run_forever()
    loop.close()
    
    print('end')
    

    输出:

    a
    c
    b	
    end
    

    在A()中再利用loop调用其它函数B()时,A也并不停下来,实现协程效果

    1.2使用asyncio实现协程

    什么是协程?
    当程序变得冗长复杂时,将其划分成子例程的方式会使处理变得更加便利,每个子例程完成一个特定的任务
    子例程无法独立运行,只能在主程序的要求下才能运行,主程序负责协调子例程的使用,协程就是子例程的泛化。在协程中,可以暂停执行点,同时保持干预时的本地状态,便于后续继续执行
    协程相互交错的控制组件就是事件循环,事件循环追踪全部的协程,并安排其执行时间

    协程的其它重要特点:

    1. 协程支持多个进入点,可以多次生成(yield)
    2. 协程能够执行转移至任何其它协程

    生成(yield)这个术语用于描述那些暂停并将控制流传递给另一个协程的协程,协程可以同时传递控制流和值

    示例2:
    A()和B()类似并行

    import asyncio
    
    @asyncio.coroutine
    def A():
        print('a - start')
        yield from asyncio.sleep(1)
        print('a - end')
    
    @asyncio.coroutine
    def B(x):
        print('b - start')
        result = yield from C()
        print(x)
        yield from asyncio.sleep(1)
        print(f'b :{result}')
    
    @asyncio.coroutine
    def C():
        print('c - start')
        yield from asyncio.sleep(1)
        print('c - end')
        return 'this is C return'
    
    loop = asyncio.get_event_loop()
    # loop.run_until_complete(A())   # 只执行一个
    # loop.run_until_complete(asyncio.wait([A(), B('d')]))  # 并发执行方法1
    tasks = [asyncio.Task(A()), asyncio.Task(B('b - end'))]
    loop.run_until_complete(asyncio.wait(tasks))            # 并发执行方法2
    loop.close()
    print('end')
    
    # 类asyncio.Task(coroutine)用于调度协程的执行
    # asyncio.wait(tasks)将等待给定协程执行完毕
    

    输出:

    a - start
    b - start
    c - start
    a - end
    c - end
    b - end
    b :this is C return
    end
    

    分析:asyncio.sleep()期间,主线程并未等待,而是去执行EventLoop中可执行的coroutine

    @asyncio.coroutine把一个generator标记为coroutine类型,再把这个coroutine放到EventLoop中执行(实测,可以不@标记)

    相关方法

    loop = get_event_loop() : 获得当前上下文的事件循环
    如果close()关闭了后,重新打开需要以下操作:
    loop = asyncio.new_event_loop() : 创建新的时间循环对象
    asyncio.set_event_loop(loop) : 将当前上下文的时间循环设置为指定的循环

    loop.call_soon(callback, *args) : 立即调用回调对象,参数
    loop.call_later(delay, callback, *args) : 延时delay秒后,调用回调对象
    loop.call_at(when, callback, *args) : 在指定的时间调用回调对象,(when是绝对时间,可以参考loop.time()设置)

    loop.run_forever() : 一直执行,直到调用stop()
    loop.run_until_complete(future) : 运行指定的协程函数(Future3

    loop.time() : 获取事件循环的内部时钟
    loop.close() : 关闭事件循环
    loop.is_running() : 是否运行中
    loop.is_close() : 是否关闭

    使用示例

    示例:
    异步网络并行访问

    import asyncio
    
    @asyncio.coroutine
    def wget(host):
        print('wget %s...' % host)
        connect = asyncio.open_connection(host, 80)
        reader, writer = yield from connect
        header = 'GET / HTTP/1.0
    Host: %s
    
    ' % host
        writer.write(header.encode('utf-8'))
        yield from writer.drain()
        while True:
            line = yield from reader.readline()
            if line == b'
    ':
                break
            print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
        # Ignore the body, close the socket
        writer.close()
    
    loop = asyncio.get_event_loop()
    tasks = [wget(host) for host in ['www.baidu.com', 'www.aliyun.com', 'www.qq.com']]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    

    2.async/await

    2.1 客户端使用

    为了简化标识异步io,python3.5引入新语法asyncawait
    只需将2步替换:

    1. asyncio.coroutine -> async
    2. yield from -> await

    示例:

    import asyncio
    
    @asyncio.coroutine
    def A():
        print('a')
        yield from asyncio.sleep(1)
        print('c')
    
    loop = asyncio.get_event_loop()
    tasks = [asyncio.Task(A())]
    loop.run_until_complete(asyncio.wait(tasks))            # 并发执行方法2
    loop.close()
    print('end')
    

    替换为:

    import asyncio
    
    async def A():
        print('a')
        await asyncio.sleep(1)
        print('c')
    
    loop = asyncio.get_event_loop()
    tasks = [asyncio.Task(A())]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    print('end')
    

    2.2 服务器端使用

    asyncio可以实现单线程并发io操作,如果仅用于客户端,效果不大
    可以用在服务器端,由于HTTP连接就是io操作,因此可以使用单线程+协程实现多用户的高并发

    asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架

    示例:
    启动一个web服务,通过浏览器访问localhost:8000

    import asyncio
    
    from aiohttp import web
    
    async def index(request):
        await asyncio.sleep(0.5)
        return web.Response(body='<h1>Index</h1>')
    
    async def hello(request):
        await asyncio.sleep(0.5)
        text = '<h1>hello, %s!</h1>' % request.match_info['name']
        return web.Response(body=text)
    
    async def init(loop):
        app = web.Application(loop=loop)
        app.router.add_route('GET', '/', index)
        app.router.add_route('GET', '/hello/{name}', hello)
        srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
        print('Server started at http://127.0.0.1:8000...')
        return srv
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(init(loop))
    loop.run_forever()
    

    1.参考书籍: 参考书籍:《Python并行编程手册》

    2.参考文章1: 这篇主要参考:廖雪峰 - asyncio:https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/00143208573480558080fa77514407cb23834c78c6c7309000

    3.Future: Future:是Asyncio的一个类,与concurrent.futures.Futures非常相似,Futures类代表一个还不可用的结果,它是对尚未完成的任务的抽象表示;Python 3.2引入concurrent.futures模块,支持管理并发编程任务,如进程池和线程池、非确定性执行流、多进程、线程同步(这个目前没看出有什么特别的,池化管理不是多线程和多进程库自带吗?concurrent.futures.ProcessPoolExecutor

  • 相关阅读:
    工厂模式 ioc dom4j 反射之我的一点理解
    hibernate中注解方式中的控制反转
    java中的数据存储(堆,栈) 很重要
    hibernate中映射关系总结
    三极管使用方法
    OC OD介绍
    HP Jack介绍
    Jlink接口的Jtag和SWD接口定义
    什么是域什么是工作组
    Ubuntu安装.run文件
  • 原文地址:https://www.cnblogs.com/maplesnow/p/12044363.html
Copyright © 2011-2022 走看看