zoukankan      html  css  js  c++  java
  • IO框架:asyncio 下篇

    动态添加协程

    在实战之前,我们要先了解下在asyncio中如何将协程态添加到事件循环中的。这是前提。

    如何实现呢,有两种方法:

    • 主线程是同步
    import time
    import asyncio
    from queue import Queue
    from threading import Thread
    
    def start_loop(loop):
        # 一个在后台永远运行的事件循环
        asyncio.set_event_loop(loop)
        loop.run_forever()
    
    def do_sleep(x, queue, msg=""):
        time.sleep(x)
        queue.put(msg)
    
    queue = Queue()
    
    new_loop = asyncio.new_event_loop()
    
    # 定义一个线程,并传入一个事件循环对象
    t = Thread(target=start_loop, args=(new_loop,))
    t.start()
    
    print(time.ctime())
    
    # 动态添加两个协程
    # 这种方法,在主线程是同步的
    new_loop.call_soon_threadsafe(do_sleep, 6, queue, "第一个")
    new_loop.call_soon_threadsafe(do_sleep, 3, queue, "第二个")
    
    while True:
        msg = queue.get()
        print("{} 协程运行完..".format(msg))
        print(time.ctime())

    由于是同步的,所以总共耗时6+3=9秒.

    输出结果


    Thu May 31 22:11:16 2018
    第一个 协程运行完..
    Thu May 31 22:11:22 2018
    第二个 协程运行完..
    Thu May 31 22:11:25 2018
    • 主线程是异步
    import time
    import asyncio
    from queue import Queue
    from threading import Thread
    
    def start_loop(loop):
        # 一个在后台永远运行的事件循环
        asyncio.set_event_loop(loop)
        loop.run_forever()
    
    async def do_sleep(x, queue, msg=""):
        await asyncio.sleep(x)
        queue.put(msg)
    
    queue = Queue()
    
    new_loop = asyncio.new_event_loop()
    
    # 定义一个线程,并传入一个事件循环对象
    t = Thread(target=start_loop, args=(new_loop,))
    t.start()
    
    print(time.ctime())
    
    # 动态添加两个协程
    # 这种方法,在主线程是异步的
    asyncio.run_coroutine_threadsafe(do_sleep(6, queue, "第一个"), new_loop)
    asyncio.run_coroutine_threadsafe(do_sleep(3, queue, "第二个"), new_loop)
    
    while True:
        msg = queue.get()
        print("{} 协程运行完..".format(msg))
        print(time.ctime())
    
    输出结果 由于是同步的,所以总共耗时max(
    6, 3)=6秒 Thu May 31 22:23:35 2018 第二个 协程运行完.. Thu May 31 22:23:38 2018 第一个 协程运行完.. Thu May 31 22:23:41 2018

    实战:利用redis实现动态添加任务

    对于并发任务,通常是用生成消费模型,对队列的处理可以使用类似master-worker的方式,master主要用户获取队列的msg,worker用户处理消息。

    为了简单起见,并且协程更适合单线程的方式,我们的主线程用来监听队列,子线程用于处理队列。这里使用redis的队列。主线程中有一个是无限循环,用户消费队列。

    服务开启后,我们就可以运行我们的客户端了。
    并依次输入key=queue,value=5,3,1的消息。

     一切准备就绪之后,我们就可以运行我们的代码了。

    import time
    import redis
    import asyncio
    from queue import Queue
    from threading import Thread
    
    def start_loop(loop):
        # 一个在后台永远运行的事件循环
        asyncio.set_event_loop(loop)
        loop.run_forever()
    
    async def do_sleep(x, queue):
        await asyncio.sleep(x)
        queue.put("ok")
    
    def get_redis():
        connection_pool = redis.ConnectionPool(host='127.0.0.1', db=0)
        return redis.Redis(connection_pool=connection_pool)
    
    def consumer():
        while True:
            task = rcon.rpop("queue")
            if not task:
                time.sleep(1)
                continue
            asyncio.run_coroutine_threadsafe(do_sleep(int(task), queue), new_loop)
    
    
    if __name__ == '__main__':
        print(time.ctime())
        new_loop = asyncio.new_event_loop()
    
        # 定义一个线程,运行一个事件循环对象,用于实时接收新任务
        loop_thread = Thread(target=start_loop, args=(new_loop,))
        loop_thread.setDaemon(True)
        loop_thread.start()
        # 创建redis连接
        rcon = get_redis()
    
        queue = Queue()
    
        # 子线程:用于消费队列消息,并实时往事件对象容器中添加新任务
        consumer_thread = Thread(target=consumer)
        consumer_thread.setDaemon(True)
        consumer_thread.start()
    
        while True:
            msg = queue.get()
            print("协程运行完..")
            print("当前时间:", time.ctime())

    输出结果

    Thu May 31 23:42:48 2018
    协程运行完..
    当前时间: Thu May 31 23:42:49 2018

    协程运行完..
    当前时间: Thu May 31 23:42:51 2018

    协程运行完..
    当前时间: Thu May 31 23:42:53 2018
    • loop_thread:单独的线程,运行着一个事件对象容器,用于实时接收新任务。
    • consumer_thread:单独的线程,实时接收来自Redis的消息队列,并实时往事件对象容器中添加新任务。

    我们在Redis,分别发起了5s3s1s的任务。
    从结果来看,这三个任务,确实是并发执行的,1s的任务最先结束,三个任务完成总耗时5s

    运行后,程序是一直运行在后台的,我们每一次在Redis中输入新值,都会触发新任务的执行。

    出处:https://www.cnblogs.com/wongbingming/p/9124142.html

  • 相关阅读:
    [每日电路图] 2、红外遥控电路原理设计与解析【转+解读】
    [每日电路图] 1、基于AT89C52单片机最小系统接口电路【转】
    [nRF51822] 5、 霸屏了——详解nRF51 SDK中的GPIOTE(从GPIO电平变化到产生中断事件的流程详解)
    [nRF51822] 4、 图解nRF51 SDK中的Schedule handling library 和Timer library
    [nRF51822] 3、 新年也来个总结——图解nRF51 SDK中的Button handling library和FIFO library
    [MFC] VS2013版本MFC工程移植到VC6.0上
    [异常解决] ubuntu上安采用sudo启动的firefox,ibus输入法失效问题解决
    [编译] 1、第一个makefile简单例子
    nginx静态文件不设置缓存
    Docker容器挂载文件(转载)
  • 原文地址:https://www.cnblogs.com/miaoweiye/p/12022546.html
Copyright © 2011-2022 走看看