zoukankan      html  css  js  c++  java
  • celery--实现异步任务

    前戏

    已经安装好了celery,redis模块,还安装好了redis服务。

    新建两个py文件,一个为task,一个为demo,内容如下。

    task.py

    import time
    
    def test(name):
        time.sleep(5)
        print(f'hello {name}')
        return f'{name}'

    demo.py

    from task import test
    
    if __name__ == '__main__':
        test('jack')

    这两个文件内容相信大家都能看懂(看不懂也没关系)

    执行demo.py会输出hello jack

    使用celery实现异步

    声明:本人的redis服务和代码都在远程服务器上,本地的pycharm也上连接的远程服务器

    修改task.py

    import time
    from celery import Celery
    
    # 实例化Celery
    app = Celery('celery_test', broker='redis://:redis666@127.0.0.1:6379', backend='redis://:redis666@127.0.0.1:6379')
    # redis666为redis的密码,前面要加: 后面要加@
    
    @app.task
    def test(name):
        time.sleep(5)
        print(f'hello {name}')
        return 'wahaha'

    其中的broker是存放缓存任务的。backend是存放缓存任务结果的。这里是存放在redis里。

    修改demo.py

    from task import test
    
    if __name__ == '__main__':
        # 将任务存放在broker里
        test.delay('jack')  # 这样调用

    在服务器上启动celery worker,我的是在虚拟环境里安装的celery,所以要进入到虚拟环境在执行

    celery -A task worker -l INFO

    也可以加上-c参数  celery worker -A task -c 6 -l INFO

    其中 -A task 表示只运行task里的任务。-c 启动的worker数量,这里为6。-l 日志级别,这里为INFO级别。 

     执行demo.py,查看celery的work

    celery的配置文件抽取

    目录结构如下

    apps下的__init__.py内容

    from celery import Celery
    
    app = Celery('test_task')
    
    app.config_from_object('apps.celery_conf')

    celery_conf.py的内容

    BROKER_URL = 'redis://:redis666@127.0.0.1:6379/1'
    
    CELERY_RESULT_BACKEND = 'redis://:redis666@127.0.0.1:6379/2'
    
    # 需要导入task1和task2
    CELERY_IMPORTS = (
        'apps.task1',
        'apps.task2'
    )

    task1.py的内容

    from  apps import app
    
    @app.task
    def add(x,y):
        return x+y

    task2.py的内容

    from  apps import app
    
    @app.task
    def subs(x,y):
        return x-y

    demo.py的内容

    from apps.task1 import add
    from apps.task2 import subs
    
    if __name__ == '__main__':
        add.delay(3,5)
        subs.delay(8,6)

    启动celery worker

    运行demo.py

    celery的常用配置

    上面我们创建里celery_conf.py文件,用来存放celery的配置。其他的配置我们也可以写在里面,常用的配置如下

    BROKER_URL = 'redis://:redis666@127.0.0.1:6379/1'
    
    CELERY_RESULT_BACKEND = 'redis://:redis666@127.0.0.1:6379/2'
    
    # 需要导入task1和task2
    CELERY_IMPORTS = (
        'apps.task1',
        'apps.task2'
    )
    
    # 时区设置。默认UTC
    CELERY_TIMEZONE = 'Asia/Shanghai'
    
    # 是否使用本地的时区,False时将使用本地的时区
    CELERY_ENABLE_UTC = False
    
    # 重写task的属性,限制tasks模块下的add函数,每秒钟只能执行10次
    CELERY_ANNOTATIONS = {'tasks.add':{'rate_limit':'10/s'}}
    
    # 连接错误情况下是否重试发布任务消息,默认为True
    CELERY_TASK_PUBLISH_RETRY = False
    
    # 并发的worker数量,也是命令行-c指定的数目
    # 事实上并不是worker数量越多越好,保证任务不堆积,加上一些新增任务的预留就可以了
    CELERYD_CONCURRENCY = 20
    
    # 每次worker去任务队列中取的任务数量
    CELERY_PREFETH_MULTIPLIRE = 5
    
    # 每个worker执行多少次被杀掉
    CELERYD_MAX_TASKS_PER_CHILD = 200
    
    # 单个任务的最大执行时间
    CELERY_TASK_TIME_LIMIT = 60
    
    # celery任务执行结果的超时时间
    CELERY_TASK_RESULT_EXPIRES = 1000

    wins下启动worker报错

    如果你在wins下执行了 celery worker -A task -l INFO,运行demo.py后,celery报如下错误

    这是因为3.x之后的celery不支持wins导致的。我们只需要在安装一个第三方库eventlet就可以了

    pip install eventlet

    然后我们启动worker就不能以上面的方式启动了,需要加个 -P 参数

    celery worker -A task -l INFO -P eventlet

    然后执行demo.py,worker就不会报错了

  • 相关阅读:
    并查集模板
    143. 最大异或对(Trie树存整数+二进制)
    Trie树模板
    835. 字符串统计(Trie树模板题)
    生兔兔
    汉诺塔问题
    一本通 1296:开餐馆
    一本通 1272:【例9.16】分组背包
    一本通 1292:宠物小精灵之收服
    一本通 1271:【例9.15】潜水员
  • 原文地址:https://www.cnblogs.com/zouzou-busy/p/12093739.html
Copyright © 2011-2022 走看看