zoukankan      html  css  js  c++  java
  • Python中的异步任务队列 arq

    引言

    最近在用 sanic 写东西,所有涉及到IO阻塞的代码都需要用 aio 的模块,好在近年来 asyncio 生态圈发展的还算不错,该有的都有 ~
    近期业务中 登录/注册 业务涉及的很复杂(涉及到邀请),需要解锁、发送短信等操作,想来这么个模块整的很繁琐,以后加个滑动验证那还了得。
    于是乎,想整一个类似于celery 的模块,进行任务解耦,但是目前 celery 还目前不支持异步(官方将在 celery5 支持异步)。
    所以目前查阅资料发现了一个 python 实现的 arq 模块,已经应用在了生产环境,效果还算不错 ~
    官方是这么介绍它的:

    • 非阻塞
    • 延迟执行、定时任务、重试机制
    • 优雅

    首先先安装一下它:

    $ pip install arq
    

    那么接下来,快速了解下它的使用吧 ~

    简单使用

    先看下面编写的这段代码

    # filename: tasks.py
    #! /usr/bin/env python
    # -*- coding: utf-8 -*-
    # Date: 2019/5/23
    
    import asyncio
    from arq import create_pool
    from arq.connections import RedisSettings
    
    
    async def say_hello(ctx, name) -> None:
        """任务函数
    
        Parameters
        ----------
        ctx: dict
            工作者上下文
    
        name: string
    
        Returns
        -------
        dict
        """
        print(ctx)
        print(f"Hello {name}")
    
    
    async def startup(ctx):
        print("starting...")
    
    
    async def shutdown(ctx):
        print("ending...")
    
    
    async def main():
        # 创建
        redis = await create_pool(RedisSettings(password="root123456"))
        # 分配任务
        await redis.enqueue_job('say_hello', name="liuzhichao")
    
    
    # WorkerSettings定义了创建工作时要使用的设置,
    # 它被arq cli使用
    class WorkerSettings:
        # 队列使用 `redis` 配置, 可以配置相关参数
        # 例如我的密码是 `rooot123456`
        redis_settings = RedisSettings(password="root123456")
        # 被监听的函数
        functions = [say_hello]
        # 开启 `worker` 运行
        on_startup = startup
        # 终止 `worker` 后运行
        on_shutdown = shutdown
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    

    1、接下来看我们怎么运行它

    $ arq tasks.WorkerSettings
    Maybe you can see 
    
    10:56:25: Starting worker for 1 functions: say_hello
    10:56:25: redis_version=4.0.1 mem_usage=32.00M clients_connected=6 db_keys=19189
    starting...
    

    2、运行 tasks.py 文件

    $ python3 tasks.py
    Maybe you can see  
    
    11:01:04:   0.29s → 5a5ac0edd5ad4b318b9848637b1ae800:say_hello(name='liuzhichao')
    {'redis': <ArqRedis <ConnectionsPool [db:0, size:[1:10], free:1]>>, 'job_id': '5a5ac0edd5ad4b318b9848637b1ae800', 'job_try': 1, 'enqueue_time': datetime.datetime(2019, 5, 23, 3, 1, 4, 570000), 'score': 1558580464570}
    Hello liuzhichao
    11:01:04:   0.00s ← 5a5ac0edd5ad4b318b9848637b1ae800:say_hello ● 
    

    3、那么这个简单任务就执行完成了,是不是特别简单 ~

    定时任务

    #! /usr/bin/env python
    # -*- coding: utf-8 -*-
    # Date: 2019/5/23
    
    from arq import cron
    from arq.connections import RedisSettings
    
    
    async def run_regularly(ctx):
        # 表示在 10、11、12 分 50秒的时候打印
        print('run job at 26:05, 27:05 and 28:05')
    
    
    class WorkerSettings:
        redis_settings = RedisSettings(password="root123456")
    
        cron_jobs = [
            cron(run_regularly, minute={10, 11, 12}, second=50)
        ]
    

    1、运行它

    $ arq tasks.WorkerSettings
    If run out of the time,maybe you can see
    
    11:10:25: Starting worker for 1 functions: cron:run_regularly
    11:10:25: redis_version=4.0.1 mem_usage=32.00M clients_connected=6 db_keys=19190
    
    11:10:51:   0.51s → cron:run_regularly()
    run foo job at 26:05, 27:05 and 28:05
    11:10:51:   0.00s ← cron:run_regularly ● 
    
    11:11:51:   0.51s → cron:run_regularly()
    run foo job at 26:05, 27:05 and 28:05
    11:11:51:   0.00s ← cron:run_regularly ● 
    
    11:12:50:   0.50s → cron:run_regularly()
    run foo job at 26:05, 27:05 and 28:05
    11:12:50:   0.00s ← cron:run_regularly ● 
    
    按照此时间线,然后会一直进行无限循环下去
    

    更多

    更多api学习请查看官方文档 --> https://arq-docs.helpmanual.io

  • 相关阅读:
    Django of python 中文文档 及debug tool
    爬虫、网页测试 及 java servlet 测试框架等介绍
    python的分布式爬虫框架
    github 上 python 的优秀库推荐列表
    github 上 机器学习 的库推荐列表
    爬虫,如何防止被ban之策略大集合
    make menuconfig 时出现 mixed implicit and normal rules: deprecated syntax
    adb通过TCP/IP连接提示 unable to connect to *, Connection refused的解决方法
    Android中使用MediaCodec硬件解码,高效率得到YUV格式帧,快速保存JPEG图片(不使用OpenGL)(附Demo)
    ThinkCMF X2.2.2多处SQL注入漏洞分析
  • 原文地址:https://www.cnblogs.com/leguan1314/p/10910465.html
Copyright © 2011-2022 走看看