zoukankan      html  css  js  c++  java
  • celery --分布式任务队列

    一、介绍

    celery是一个基于python开发的分布式异步消息任务队列,用于处理大量消息,同时为操作提供维护此类系统所需的工具。
    它是一个任务队列,专注于实时处理,同时还支持任务调度。如果你的业务场景中需要用到异步任务,就可以考虑使用celery

    二、实例场景

    1、你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。
    2、你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福

    三、优点

    • 1、简单:一但熟悉了celery的工作流程后,配置和使用还是比较简单的
    • 2、高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
    • 3、快速:一个单进程的celery每分钟可处理上百万个任务
    • 4、灵活:几乎celery的各个组件都可以被扩展及自定制

    四、入门

    celery 需要一个解决方案来发送和接受消息,通常,这是以称为消息代理的单独服务的形式出现的
    有以下几种解决方案,包括:
    一:RabbitMQ(消息队列,一种程序之间的通信方式)
    rabbitmq 功能齐全,稳定,耐用且易于安装。它是生产环境的绝佳选择。
    如果您正在使用Ubuntu或Debian,请执行以下命令安装RabbitMQ:

    $ sudo apt-get install rabbitmq-server

    命令完成后,代理已经在后台运行,准备为您移动消息:。Starting rabbitmq-server: SUCCESS
    二、redis

    redis功能齐全,但在突然中止或者电源故障时更容易丢失数据

    五、安装

    $ pip install celery 

    六、应用

    创建一个tasks.py文件

    from celery import Celery
    
    app = Celery('tasks', broker='pyamqp://guest@localhost//')
    @app.task
    def add(x, y):
        return x + y

    第一个参数Celery是当前模块的名称。只有在__main__模块中定义任务时才能自动生成名称。
    第二个参数是broker关键字参数,指定要使用的消息代理的URL。这里使用RabbitMQ(也是默认选项)。
    您可以使用RabbitMQ amqp://localhost,或者您可以使用Redis redis://localhost。
    您定义了一个名为add的任务,返回两个数字的总和。

     1 from __future__ import absolute_import
     2             import os
     3             from celery import Celery
     4             from django.conf import settings
     5             # set the default Django settings module for the 'celery' program.
     6             os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'saruman_server.settings')
     7             app = Celery('saruman_server')
     8 
     9             # Using a string here means the worker will not have to
    10             # pickle the object when using Windows.
    11             app.config_from_object('django.conf:settings')
    12             app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    13 
    14             @app.task(bind=True)
    15             def debug_task(self):
    16                 print('Request: {0!r}'.format(self.request))
    和django配合实例

    七、运行celery工作服务器

    您现在可以通过使用worker 参数执行我们的程序来运行worker :

    celery -A tasks worker --loglevel=info

    有关可用命令行选项的完整列表,请执行以下操作:

    $ celery worker --help

    还有其他几个可用的命令,也可以提供帮助:

    $ celery help

    八、调用任务

    要调用我们的任务,您可以使用该delay()方法。
    apply_async() 可以更好地控制任务执行

    >>> from tasks import add
    >>> add.delay(4, 4)

    调用任务会返回一个AsyncResult实例。这可用于检查任务的状态,等待任务完成,或获取其返回值(或者如果任务失败,则获取异常和回溯)。

    九、保持结果

    如果您想跟踪任务的状态,Celery需要在某处存储或发送状态。有几个内置的结果后端可供选择:SQLAlchemy / Django ORM, Memcached,Redis,RPC(RabbitMQ / AMQP),以及 - 或者您可以定义自己的。
    在本例中,我们使用rpc结果后端,它将状态作为瞬态消息发回。后端通过backend参数 指定Celery

    app = Celery('tasks', backend='rpc://', broker='pyamqp://')

    或者,如果您想使用Redis作为结果后端,但仍然使用RabbitMQ作为消息代理(一种流行的组合):

    app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')

    现在配置了结果后端,让我们再次调用该任务。这次你将保持AsyncResult调用任务时返回的实例:

    >>> result = add.delay(4, 4)

    该ready()方法返回任务是否已完成处理:

    >>> result.ready()
    False 

    十、配置

    与消费类电器一样,celery不需要太多配置即可运行。它有一个输入和一个输出。输入必须连接代理,输出可以
    选择到结果后端。
    可以直接在应用程序上或使用专用配置模块设置配置。例如,您可以通过更改task_serializer设置来配置用于序列化任务有效负载的默认序列化程序:

    app.conf.task_serializer = 'json'

    如果您一次配置了许多设置,则可以使用update:

    app.conf.update(
    task_serializer='json',
    accept_content=['json'], # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
    )

    对于大型项目,建议使用专用配置模块。不鼓励硬编码周期性任务间隔和任务路由选项。将它们保存在集中位置要好得多。对于库来说尤其如此,因为它使用户能够控制其任务的行为方式。集中配置还允许您的SysAdmin在发生系统故障时进行简单的更改。
    您可以通过调用app.config_from_object()方法告诉Celery实例使用配置模块:

    app.config_from_object('celeryconfig')

    此模块通常称为“ celeryconfig”,但您可以使用任何模块名称。
    在上面的例子中,一个名为的模块celeryconfig.py必须可以从当前目录或Python路径加载。它可能看起来像这样:
    celeryconfig.py:

    broker_url = 'pyamqp://'
    result_backend = 'rpc://'
    
    task_serializer = 'json'
    result_serializer = 'json'
    accept_content = ['json']
    timezone = 'Europe/Oslo'
    enable_utc = True
     1 from datetime import timedelta
     2 
     3 import djcelery
     4 
     5 djcelery.setup_loader()
     6 BROKER_URL = 'amqp://guest@localhost//'  #输入
     7 CELERY_RESULT_BACKEND = 'amqp://guest@localhost//'  #返回的结果
     8 
     9 #导入指定的任务模块
    10 CELERY_IMPORTS = (
    11     'fir.app.fir.tasks',
    12 )
    13 
    14 CELERYBEAT_SCHEDULE = {
    15     'receive_mail': {
    16         "task": "fir.app.fir.tasks.receive_mail",
    17         "schedule": timedelta(seconds=5),
    18         "args": (),
    19     },
    20 }
    View Code 

    要验证配置文件是否正常工作且不包含任何语法错误,您可以尝试导入它:
    ####################################################

    python -m celeryconfig

    为了演示配置文件的强大功能,您可以将行为不当的任务路由到专用队列:

    celeryconfig.py:
    task_routes = {
    'tasks.add': 'low-priority',
    }
    或者不是路由它,而是可以对任务进行速率限制,这样在一分钟(10 / m)内只能处理10种此类任务:

    celeryconfig.py:
    task_annotations = {
    'tasks.add': {'rate_limit': '10/m'}
    }
    如果您使用RabbitMQ或Redis作为代理,那么您还可以指示工作人员在运行时为任务设置新的速率限制:

    $ celery -A tasks control rate_limit tasks.add 10/m
    worker@example.com: OK
    new rate limit set successfully

    十一、在项目中如何使用celery

    1、可以把celery配置成一个应用
    2、目录结构如下:

    proj/__init__.py
        /celery.py
        /tasks.py

    3、proj/celery.py内容

    from __future__ import absolute_import, unicode_literals
    from celery import Celery
    
    app = Celery('proj',
        broker='amqp://',
        backend='amqp://',
        include=['proj.tasks'])
    
    # Optional configuration, see the application user guide.
    app.conf.update(
        result_expires=3600,
    )
    
    if __name__ == '__main__':
        app.start()

    4、proj/tasks.py中的内容

    from __future__ import absolute_import, unicode_literals
    from .celery import app
    
    
    @app.task
    def add(x, y):
        return x + y
    
    
    @app.task
    def mul(x, y):
        return x * y
    
    
    @app.task
    def xsum(numbers):
        return sum(numbers)

    5、启动worker

    $ celery -A proj worker -l info

    输出

    -------------- celery@Alexs-MacBook-Pro.local v4.0.2 (latentcall)
    ---- **** -----
    --- * *** * -- Darwin-15.6.0-x86_64-i386-64bit 2017-01-26 21:50:24
    -- * - **** ---
    - ** ---------- [config]
    - ** ---------- .> app: proj:0x103a020f0
    - ** ---------- .> transport: redis://localhost:6379//
    - ** ---------- .> results: redis://localhost/
    - *** --- * --- .> concurrency: 8 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** -----
    -------------- [queues]
    .> celery exchange=celery(direct) key=celery

    django 中使用celery:参考链接:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery-with-django 

    十二、监控工具flower

    如果有些任务出现问题,可以用flower工具监控(基于tornado)
    安装:pip install flower

    使用:
    三种启动方式

    celery flower
    celery flower --broker 
    python manage.py celery flower #就能读取到配置里的broker_url 默认是rabbitmq

    打开运行后的链接
    打开worker
    python manage.py celery worker -l info

  • 相关阅读:
    hashlib模块
    logging模块
    Python的富比较方法
    格式化符号说明
    __str__与__repr__区别
    2014-07-18 10:25

    2014-07-17 17:04
    2014-07-17 16:44
    2014-07-16 15:54
  • 原文地址:https://www.cnblogs.com/haiyan123/p/9835957.html
Copyright © 2011-2022 走看看