zoukankan      html  css  js  c++  java
  • Celery-分布式任务队列

    一、介绍

    官方文档:http://docs.celeryproject.org/en/latest/index.html

    pip3 install celery

    Celery是一个专注于实时处理和任务调度的分布式任务队列,通过它可以轻松的实现任务的异步处理。

    使用Celery的常见场景:

    • Web应用。当用户触发的一个操作需要较长时间才能执行完成时,可以把它作为任务交给Celery去异步执行,执行完再返回给用户。这段时间用户不需要等待,提高了网站的整体吞吐量和响应时间。
    • 定时任务。生产环境经常会跑一些定时任务。假如你有上千台的服务器、上千种任务,定时任务的管理很困难,Celery可以帮助我们快速在不同的机器设定不同种任务。
    • 同步完成的附加工作都可以异步完成。比如发送短信/邮件、推送消息、清理/设置缓存等。

    Celery包含如下组件:

    • Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
    • Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。
    • Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。
    • Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
    • Result Backend:任务处理完后保存状态信息和结果,以供查询。

    二、简单示例

    创建一个tasks.py:

    from celery import Celery
    
    app = Celery(
        "tasks", 
        broker="amqp://pd:123456@localhost:5672//",
        backend="redis://:123456@localhost:6379/0")
    
    @app.task
    def add(x, y):
        return x+y

    启动Celery Worker来开始监听并执行任务:

    celery -A tasks worker -l info

    更多有关命令:

    celery worker --help
    

    再打开一个终端, 进行命令行模式,调用任务:

    >>> from tasks import add
    >>> relt = add.delay(10, 10)
    >>> relt.ready()  # 检查任务是否已经完成
    True
    >>> relt.get()    # 获取任务结果,可设置timeout超时
    20
    >>> relt
    <AsyncResult: 470d5f45-42eb-4b0c-bd38-06b85fa5599b>
    >>> relt.id
    '470d5f45-42eb-4b0c-bd38-06b85fa5599b'
    >>> relt.result
    20
    >>> relt.status
    'SUCCESS'
    from celery import Celery
    from celery.result import AsyncResult
    
    app = Celery(
        "tasks", 
        broker="amqp://pd:123456@localhost:5672/pdvhost",
        backend="redis://:123456@localhost:6379/0")
    
    result = AsyncResult(id="470d5f45-42eb-4b0c-bd38-06b85fa5599b", app=app)
    print(result.get())  # 20
    View Code

    三、配置

    官方文档,配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html#configuration

    像上面简单示例中,要想添加配置,则可以直接在应用程序设置配置:

    app.conf.task_serializer = "json"
    

    如果您一次配置多个设置,则:

    app.conf.update(
        task_serializer="json",
        accept_content=["json"],
        result_serializer="json",
        timezone="Europe/Oslo",
        enable_utc=True,
    )
    

    对于大型项目,建议使用专用配置模块。因为项目复杂,最好做到程序的解耦,所以将配置保存在集中位置是一个非常好的选择,一般默认 celeryconfig.py 模块是用来保存配置的,你也可以使用自己定义的名字,然后通过调用 app.config_from_object() 方告诉 Celery 实例使用配置模块

    app.config_from_object("celeryconfig")
    # 或者
    from . import celeryconfig
    app.config_from_object(celeryconfig)

    四、在项目中使用Celery

    项目布局:

    方案选择:

    1. RabbitMQ作为消息代理。不选Redis是因为如果Redis发生意外,会造成数据丢失等后果。
    2. Msgpack做序列化。Msgpack是一个二进制的类json的序列化方案,它比json的数据结构更小,传输更快。
    3. Redis做结果存储。
    pip3 install msgpack
    ########## celeryapp.py ##########
    from celery import Celery
    from . import celeryconfig
    
    app = Celery("proj.celeryapp", include=["proj.tasks"])
    app.config_from_object(celeryconfig)
    
    if __name__ == "__main__":
        app.start()
    
    ########## tasks.py ##########
    from .celeryapp import app
    
    @app.task
    def add(x, y):
        return x+y
    
    @app.task
    def mul(x, y):
        return x*y
    
    ########## celeryconfig.py ##########
    # 使用RabbitMQ作为消息代理
    broker_url = "amqp://pd:123456@114.116.50.214:5672//"
    # # 把任务结果存在了Redis
    result_backend = "redis://:123456@114.116.50.214:6379/0"
    # 任务序列化和反序列化使用msgpack方案
    task_serializer = "msgpack"
    # 读取任务结果一般性能要求不高,所以使用了可读性更好的json
    result_serializer = "json"
    # 任务过期时间
    result_expires = 60*60*24
    # 指定接受的内容类型
    accept_content = ["json", "msgpack"]
    代码示例

    五、在后台运行worker

    在生产中,我们需要在后台运行worker,官方文档daemonization教程中有详细描述。

    守护程序脚本使用celery multi命令在后台启动一个或多个worker:

    # 启动worker后台运行
    celery multi start w1 -A proj.celeryapp -l info
    celery multi start w2 -A proj.celeryapp -l info
    PS:如果使用的是默认的celery.py,那么直接proj即可
    
    # 重启
    celery multi restart w1 -A proj -l info
    
    # 停止
    celery multi stop w1 -A proj -l info
    
    # 确保退出之前完成所有当前正在执行的任务
    celery multi stopwait w1 -A proj -l info

    默认情况下,它会在当前目录下创建的pid和日志文件,为了防止多个worker在彼此之上启动,最好将这些文件放在专用目录中:

    mkdir /var/run/celery
    mkdir /var/log/celery
    celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n%I.log

    六、指定队列传送任务

    官方文档:https://celery.readthedocs.io/en/latest/userguide/routing.html#guide-routing

    在 celeryconfig.py 中加入以下配置:

    # 路由键以 task. 开头的消息都进default队列
    # 路由键以 web. 开头的消息都进web_tasks队列
    task_queues = (
        Queue("default", routing_key="task.#"),
        Queue("web_tasks", routing_key="web.#"),
    )
    # 默认的交换机名字为tasks
    task_default_exchange = "tasks"
    # 设置默认交换类型为topic
    task_default_exchange_type = "topic"
    # 默认的路由键是 task.default
    task_default_routing_key = "task.default"
    # 要将任务路由到web_tasks队列,可以在task_routes设置中添加条目
    task_routes = {
        # tasks.add的消息会进入web_tasks队列
        "proj.tasks.add": {
            "queue": "web_tasks",
            "routing_key": "web.add",
        },
    }

    其他代码与上面 四 中的相同。

    启动worker,指定该worker工作于哪个队列:

    # 该worker只会执行web_tasks队列中的任务
    celery -A proj.celeryapp worker -Q web_tasks -l info

    七、定时任务

    官方文档:https://celery.readthedocs.io/en/latest/userguide/periodic-tasks.html

    Celery支持定时任务,设定好任务的执行时间,Celery就会定时自动帮你执行, 这个定时任务模块叫 celery beat。

    函数版tasks.py:

    from celery import Celery
    from celery.schedules import crontab
    
    app = Celery("tasks", broker="amqp://pd:123456@localhost:5672//", backend="redis://:123456@localhost:6379/0")
    app.conf.timezone = "Asia/Shanghai"
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # 每5秒执行一次 test("Hello")
        sender.add_periodic_task(5.0, test.s("Hello"), name="every-5s")
        # 每10秒执行一次 test("World")
        sender.add_periodic_task(10.0, test.s("World"), name="every-10s", expires=5)
        # 每周一早上 7:30 执行一次 test("Happy Mondays!")
        sender.add_periodic_task(
            crontab(hour=7, minute=30, day_of_week=1),
            test.s("Happy Mondays!"),
        )
    
    @app.task
    def test(arg):
        print(arg)
    View Code
    celery -A tasks worker -l info
    celery -A tasks beat -l info

    配置版:

    ########## celeryapp.py ##########
    from celery import Celery
    from . import celeryconfig
    
    app = Celery("proj.celeryapp", include=["proj.tasks"])
    app.config_from_object(celeryconfig)
    
    if __name__ == "__main__":
        app.start()
    
    ########## celeryconfig.py ##########
    broker_url = "amqp://pd:123456@114.116.50.214:5672//"
    result_backend = "redis://:123456@114.116.50.214:6379/0"
    task_serializer = "msgpack"
    result_serializer = "json"
    result_expires = 60*60*24
    accept_content = ["json", "msgpack"]
    timezone = "Asia/Shanghai"
    
    from celery.schedules import crontab
    beat_schedule = {
        "every-10s": {
            "task": "proj.tasks.add",
            "schedule": 10.0,
            "args": (10, 10)
        },
        "every-monday-morning-7:30": {
            "task": "proj.tasks.mul",
            "schedule": crontab(hour=7, minute=30, day_of_week=1),
            "args": (10, 10)
        }
    }
    
    ########## tasks.py ##########
    from .celeryapp import app
    
    @app.task
    def add(x, y):
        return x+y
    
    @app.task
    def mul(x, y):
        return x*y
    View Code
    celery -A proj.celeryapp worker -l info
    celery -A proj.celeryapp beat -l info 

    八、在Django中使用celery

    发布任务

    https://celery.readthedocs.io/en/latest/django/first-steps-with-django.html#extensions

    项目布局:

    import os
    from celery import Celery
    
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
    app = Celery("mysite")
    app.config_from_object("django.conf:settings", namespace="CELERY")
    app.autodiscover_tasks()
    
    @app.task(bind=True)
    def debug_task(self):
        print("Request: {0!r}".format(self.request))
    celeryapp.py
    from .celeryapp import app as celery_app
    __all__ = ["celery_app"]
    __init__.py

    settings.py,更多设置参考:https://celery.readthedocs.io/en/latest/userguide/configuration.html

    #for celery
    CELERY_BROKER_URL = "amqp://pd:123456@114.116.50.214:5672//"
    CELERY_RESULT_BACKEND = "redis://:123456@114.116.50.214:6379/0"

    在app里的tasks.py里编写任务:

    from celery import shared_task
    
    @shared_task
    def add(x, y):
        return x+y
    
    @shared_task
    def mul(x, y):
        return x*y

    在views里调用celery task:

    from django.shortcuts import HttpResponse
    from app01 import tasks
    
    def test(request):
        result = tasks.add.delay(100, 100)
        return HttpResponse(result.get())

    定时任务

    https://celery.readthedocs.io/en/latest/userguide/periodic-tasks.html#using-custom-scheduler-classes

    1、安装 django-celery-beat

    pip3 install django-celery-beat
    

    2、在settings.py中设置

    INSTALLED_APPS = [
        ...,
        'django_celery_beat',
    ]

    3、进行数据库迁移,以便创建定时任务所需的表

    python3 manage.py migrate
    

    4、开始监测定时任务

    celery -A mysite.celeryapp beat -l info -S django

    5、在django-admin界面设置定时任务

  • 相关阅读:
    8.10
    今日头条笔试题 1~n的每个数,按字典序排完序后,第m个数是什么?
    Gym 100500B Conference Room(最小表示法,哈希)
    CodeForces 438D The Child and Sequence(线段树)
    UVALIVE 6905 Two Yachts(最小费用最大流)
    Gym Conference Room (最小表示法,哈希)
    hdu 2389 Rain on your Parade(二分图HK算法)
    Codeforces Fox And Dinner(最大流)
    zoj 3367 Counterfeit Money(dp)
    ZOJ3370. Radio Waves(2-sat)
  • 原文地址:https://www.cnblogs.com/believepd/p/10643392.html
Copyright © 2011-2022 走看看