zoukankan      html  css  js  c++  java
  • Python协程(下)

    停止子线程

    如果一切正常,那么上面的例子很完美。可是,需要停止程序,直接ctrl+c,会抛出KeyboardInterrupt错误,我们修改一下主循环:

    try:
        while True:
            task = rcon.rpop("queue")
            if not task:
                time.sleep(1)
                continue
            asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
    except KeyboardInterrupt as e:
        print(e)
        new_loop.stop()
    

    可是实际上并不好使,虽然主线程try了KeyboardInterrupt异常,但是子线程并没有退出,为了解决这个问题,可以设置子线程为守护线程,这样当主线程结束的时候,子线程也随机退出。

    new_loop = asyncio.new_event_loop()
    t = Thread(target=start_loop, args=(new_loop,))
    t.setDaemon(True)    # 设置子线程为守护线程
    t.start()
     
    try:
        while True:
            # print('start rpop')
            task = rcon.rpop("queue")
            if not task:
                time.sleep(1)
                continue
            asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
    except KeyboardInterrupt as e:
        print(e)
        new_loop.stop()
    

    线程停止程序的时候,主线程退出后,子线程也随机退出才了,并且停止了子线程的协程任务

    aiohttp

    在消费队列的时候,我们使用asyncio的sleep用于模拟耗时的io操作。以前有一个短信服务,需要在协程中请求远程的短信api,此时需要是需要使用aiohttp进行异步的http请求。大致代码如下:

    server.py

    import time
    from flask import Flask, request
     
    app = Flask(__name__)
     
    @app.route('/<int:x>')
    def index(x):
        time.sleep(x)
        return "{} It works".format(x)
     
    @app.route('/error')
    def error():
        time.sleep(3)
        return "error!"
     
    if __name__ == '__main__':
        app.run(debug=True)
    

    /接口表示短信接口,/error表示请求/失败之后的报警。

    async-custoimer.py

    import time
    import asyncio
    from threading import Thread
    import redis
    import aiohttp
     
    def get_redis():
        connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)
        return redis.Redis(connection_pool=connection_pool)
     
    rcon = get_redis()
     
    def start_loop(loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()
     
    async def fetch(url):
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                print(resp.status)
                return await resp.text()
     
    async def do_some_work(x):
        print('Waiting ', x)
        try:
            ret = await fetch(url='http://127.0.0.1:5000/{}'.format(x))
            print(ret)
        except Exception as e:
            try:
                print(await fetch(url='http://127.0.0.1:5000/error'))
            except Exception as e:
                print(e)
        else:
            print('Done {}'.format(x))
     
    new_loop = asyncio.new_event_loop()
    t = Thread(target=start_loop, args=(new_loop,))
    t.setDaemon(True)
    t.start()
     
    try:
        while True:
            task = rcon.rpop("queue")
            if not task:
                time.sleep(1)
                continue
            asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
    except Exception as e:
        print('error')
        new_loop.stop()
    finally:
        pass
    

    有一个问题需要注意,我们在fetch的时候try了异常,如果没有try这个异常,即使发生了异常,子线程的事件循环也不会退出。主线程也不会退出,暂时没找到办法可以把子线程的异常raise传播到主线程。(如果谁找到了比较好的方式,希望可以带带我)。

    对于redis的消费,还有一个block的方法:

    try:
        while True:
            _, task = rcon.brpop("queue")
            asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
    except Exception as e:
        print('error', e)
        new_loop.stop()
    finally:
        pass
    

    使用 brpop方法,会block住task,如果主线程有消息,才会消费。测试了一下,似乎brpop的方式更适合这种队列消费的模型。

    127.0.0.1:6379[3]> lpush queue 5
    (integer) 1
    127.0.0.1:6379[3]> lpush queue 1
    (integer) 1
    127.0.0.1:6379[3]> lpush queue 1
    

    可以看到结果

    Waiting  5
    Waiting  1
    Waiting  1
    200
    1 It works
    Done 1
    200
    1 It works
    Done 1
    200
    5 It works
    Done 5
    

    协程消费

    主线程用于监听队列,然后子线程的做事件循环的worker是一种方式。还有一种方式实现这种类似master-worker的方案。即把监听队列的无限循环逻辑一道协程中。程序初始化就创建若干个协程,实现类似并行的效果。

    import time
    import asyncio
    import redis
     
    now = lambda : time.time()
     
    def get_redis():
        connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)
        return redis.Redis(connection_pool=connection_pool)
     
    rcon = get_redis()
     
    async def worker():
        print('Start worker')
     
        while True:
            start = now()
            task = rcon.rpop("queue")
            if not task:
                await asyncio.sleep(1)
                continue
            print('Wait ', int(task))
            await asyncio.sleep(int(task))
            print('Done ', task, now() - start)
     
    def main():
        asyncio.ensure_future(worker())
        asyncio.ensure_future(worker())
     
        loop = asyncio.get_event_loop()
        try:
            loop.run_forever()
        except KeyboardInterrupt as e:
            print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
            loop.stop()
            loop.run_forever()
        finally:
            loop.close()
     
    if __name__ == '__main__':
        main()
    

    这样做就可以多多启动几个worker来监听队列。一样可以到达效果。

  • 相关阅读:
    Java基础之Comparable与Comparator
    Java基础之访问权限控制
    Java基础之抽象类与接口
    Java基础之多态和泛型浅析
    Spring MVC入门
    Spring框架之事务管理
    伸展树(Splay Tree)进阶
    2018牛客网暑期ACM多校训练营(第三场) A
    2018牛客网暑期ACM多校训练营(第三场) H
    HDU 6312
  • 原文地址:https://www.cnblogs.com/cnkai/p/7642779.html
Copyright © 2011-2022 走看看