zoukankan      html  css  js  c++  java
  • Celery(三)实例Application

    Celery必须实例化后才可以使用,实例称之为application或者简称app。实例是线程安全的,多个Celery实例(不同的配置、部件和任务)都可以在一个进程空间中运行。

    创建一个最简单的app:

    >>> from celery import Celery
    >>> app = Celery()
    >>> app
    <Celery __main__ at 0x7f6be52d0cd0>
    

    上述的app是一个运行在__main__模块中的Celery实例。

    Main Name

    Celery发送任务信息时,是不发送任何源代码的,只是发送要执行的任务名称,而每个worker都维持着一个任务名称到任务具体函数的映射,称之为任务注册。

    所以每个任务task都要有一个独特的不重复名称,可以看下任务默认名称是如何生成的:

    >>> @app.task
    ... def add(x, y):
    ...     return x + y
    ... 
    >>> add.name
    u'__main__.add'
    

    可见任务的名称是实例运行模块的名称加上任务函数的名称。

    现在在py文件中创建一个app实例,tasks.py:

    from celery import Celery
    app = Celery()
    
    
    @app.task
    def add(x, y): return x + y
    
    
    if __name__ == '__main__':
        print add.name
        app.worker_main()
    

    在shell中直接创建Celery实例、模块直接运行或者在命令行中运行模块,都是在main模块中运行的:

    $ python tasks.py
    __main__.add
    

    而使用import导入模块的时候,main name为定义Celery实例模块的名称:

    >>> from tasks import add
    >>> add.name
    u'tasks.add'
    

    在main模块中运行是可以手动指定实例的Main name的:

    >>> from celery import Celery
    >>> app = Celery('tasks')
    >>> app.main
    'tasks'
    

    任务的名称也可以指定:

    >>> @app.task(name='sum-of-two-numbers')
    >>> def add(x, y):
    ...     return x + y
    
    >>> add.name
    'sum-of-two-numbers'
    

    Configuration

    要为app实例添加配置有几种方式:

    创建app实例时初始化:

    app = Celery('tasks', backend='redis://localhost:6379/0',
        ┆   ┆   ┆broker='redis://localhost:6379/0')
    

    使用app.conf属性设置:

    app.conf.result_backend = 'redis://localhost:6379/0'
    app.conf.broker_url = 'redis://localhost:6379/0'
    

    update多个配置:

    >>> app.conf.update(
    ...     enable_utc=True,
    ...     timezone='Asia/Shanghai',
    ...)
    

    使用配置文件,在当前目录下或者python可以搜索到的目录下建立一个配置文件,保证可以import,celeryconfig.py :

    result_backend = 'redis://localhost:6379/0'
    broker_url = 'redis://localhost:6379/0'
    

    然后:

    app.config_from_object('celeryconfig')
    

    可以测试一下配置文件是否有格式错误:

    $ python -m celeryconfig

    也可以建立一个配置类:

    class Config:
        enable_utc = True
        timezone = 'Europe/London'
    
    app.config_from_object(Config)
    

    从环境变量中获取:

    import os
    from celery import Celery
    
    #: Set default configuration module name
    os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')
    
    app = Celery()
    app.config_from_envvar('CELERY_CONFIG_MODULE')
    

    查看配置:

    >>> app.conf.humanize(with_defaults=False, censored=True)
    >>> app.conf.table(with_defaults=False, censored=True)
    

    第一条以字符串的形式返回,第二条以字典的形式返回。

    with_defaults设置为True时可以查看默认的配置,censored设置为True可以过滤掉敏感信息,包括API, TOKEN, KEY, SECRET, PASS, SIGNATURE, DATABASE。

     Laziness

    app实例是延迟的,创建一个实例只会将app设置为current app,只有在真正需要的时候才会完成。

    实例只有在调用app.finalize()方法或者访问app.tasks属性时才会完成。

    Finalizing实例会复制apps之间可以共享的tasks,执行未确定的tasks装饰器,确定所有的tasks都绑定于current app。

    app.task装饰器并不会真正的创建task任务,直到task被调用或者app finalize完成时才创建:

    >>> from celery import Celery
    >>> app = Celery()
    >>> @app.task
    ... def add(x, y):
    ...     return x + y
    ... 
    >>> add.__evaluated__()
    False
    >>> repr(add)
    '<@task: __main__.add of __main__ at 0x7f6571694cd0>'
    >>> add.__evaluated__()
    True
    

    调用了task的__repr__方法后,task就被真正创建了。

    Breaking the chain

    当使用current app时,最好将其作为参数进行传递,称之为app chain,比较好的实践是:

    class Scheduler(object):
    
        def __init__(self, app):
            self.app = app
    

    而不要:

    from celery import current_app
    
    class Scheduler(object):
    
        def run(self):
            app = current_app
    

    通用的的写法:

    from celery.app import app_or_default
    
    class Scheduler(object):
        def __init__(self, app=None):
            self.app = app_or_default(app)
    

    开发时可以设置:

    $ CELERY_TRACE_APP=1 celery worker -l info
    

    当app chain断裂时会raise一个异常。

    Abstract Tasks

    使用app.task装饰器创建的任务都继承自Task类。

    可以自定义:

    from celery import Task
    
    class DebugTask(Task):
    
        def __call__(self, *args, **kwargs):
            print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
            return super(DebugTask, self).__call__(*args, **kwargs)
    

    然后用base参数指定:

    @app.task(base=DebugTask)
    def add(x, y):
        return x + y
    

    也可以使用app的Task参数修改:

    >>> app.Task = DebugTask

      

  • 相关阅读:
    计算机网络 chapter 6 应用层
    计算机网络 chapter 4 网络层
    计算机网络 chapter 2 物理层
    计算机网络 chapter3数据链路层
    计算机网络 chapter 1 概述
    文章
    进程池线程池 协程
    MySQL
    同步锁 死锁与递归锁 信号量 线程queue event事件
    GIL全局解释器
  • 原文地址:https://www.cnblogs.com/linxiyue/p/8067398.html
Copyright © 2011-2022 走看看