zoukankan      html  css  js  c++  java
  • 用正确的姿势使用 celery

    简介

    celery 是一个简单、灵活、可靠的分布式系统,可以处理大量的消息。
    celery 是一个任务队列,关注实时处理,同时支持任务调度。

    它的工作机制是这样的:

    快速使用

    安装:pip3 install celery

    # proj.py
    from celery import Celery
    
    app = Celery("hello", broker="amqp://guest@localhost//")
    
    @app.task
    def hello():
        return "hello world"
    

    启动 worker

    // 启动 worker
    $ celery -A proj worker -l info
    
     -------------- celery@bogon v4.2.2 (windowlicker)
    ---- **** ----- 
    --- * ***  * -- macOS-10.15.1-x86_64-i386-64bit 2019-12-09 17:10:39
    -- * - **** --- 
    - ** ---------- [config]
    - ** ---------- .> app:         hello:0x10b4bc640
    - ** ---------- .> transport:   amqp://guest:**@localhost:5672//
    - ** ---------- .> results:     disabled://
    - *** --- * --- .> concurrency: 8 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** ----- 
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery
                    
    # 标注 celery 获取的任务列表
    [tasks]
      . proj.hello
    
    [2019-12-09 17:10:39,731: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
    [2019-12-09 17:10:39,750: INFO/MainProcess] mingle: searching for neighbors
    [2019-12-09 17:10:40,788: INFO/MainProcess] mingle: all alone
    [2019-12-09 17:10:40,802: INFO/MainProcess] celery@bogon ready.
    
    

    任务派发

    >>> from proj import hello
    >>> hello.delay()
    <AsyncResult: 5e9ad990-4d09-4ed3-9b90-a1ff00d9020f>
    

    任务接收

    [2019-12-09 17:12:06,720: INFO/MainProcess] Received task: proj.hello[5e9ad990-4d09-4ed3-9b90-a1ff00d9020f]  
    [2019-12-09 17:12:06,723: INFO/ForkPoolWorker-8] Task proj.hello[5e9ad990-4d09-4ed3-9b90-a1ff00d9020f] succeeded in 0.00046249000000386786s: 'hello world'
    

    这样就成功使用了 celery

    异步任务&定时任务

    异步任务

    在一些场景中,可能会涉及到发短信、邮件和一些比较耗时的任务,我们期望用户在使用的时候没有延时的效果,我们可以这样做.

    A:先增加一个发送邮件的任务

    @app.task
    def send_mail():
        # 伪造:发送邮件
        time.sleep(3)  # 模拟耗时
        print("Hello:xxxxx")
        return 1
    

    B:任务派发

    >>> from proj import send_mail
    >>> res = send_mail.delay()  # 这一步不用等待,即可执行以下代码
    # 查看任务状态
    >>> res.status
    'PENDING'   # 表示任务尚未执行完成
    >>> res.status
    'SUCCESS'  # 任务执行成功
    >>> res.result   # 获取任务结果,也是 return 的值
    1
    

    C:任务查看

    # 接收任务
    [2019-12-09 17:36:41,369: INFO/MainProcess] Received task: proj.send_mail[cb6907f1-c65a-42af-a01e-0bd7af51140f]  
    [2019-12-09 17:36:44,372: WARNING/ForkPoolWorker-8] Hello:xxxxx
    # 执行共耗时时间及结果值
    [2019-12-09 17:36:44,391: INFO/ForkPoolWorker-8] Task proj.send_mail[cb6907f1-c65a-42af-a01e-0bd7af51140f] succeeded in 3.0201863589999967s: 1
    

    如果获取任务状态及结果报错,需要配置 celery 实例的 backend 属性,来完成任务结果的存储

    定时任务

    在一些场景中,比如可能需要给用户来一份生日关怀、每天发一封给自己发一封天气预报...我们可以这样做.

    • 在添加定时任务的时候,需要使用到 celery beat 这样的一个调度器,由它来定期启动任务,派发给工作者去执行这些任务。
    • 默认情况下,任务来自 beat_schedule 设置,但是也可以使用自定义存储,比如将任务存储在SQL数据库中。
    • 要确保 celery beat 这个调度器只运行这一个,否则你可能会得到重复的任务。
    • Using a centralized approach means the schedule doesn’t have to be synchronized, and the service can operate without using locks.

    A:配置一个发生日关怀的任务

    @app.task
    def send_birthday_wish(name):
        # 伪造:发送生日祝福
        print(f"{name}, Happy birthday to you ~ ")
        return 1
    

    B:配置何时执行

    因处于东8区,先改下时区的配置

    app.conf.timezone = "Asia/Shanghai"
    

    配置任务

    # 第一种配置方式 
    ...
    from celery.schedules import crontab
    
    ...
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # 每 10s 发送一个生日祝福
        sender.add_periodic_task(10.0, send_birthday_wish.s('liuzhichao'), name='add every 10')
    
        # 每 30s 发送一个生日祝福
        sender.add_periodic_task(30.0, send_birthday_wish.s('jenny'), expires=10)
    
        # 每周一 7:30 发送一个生日祝福
        sender.add_periodic_task(
            crontab(hour=7, minute=30, day_of_week=1),
            send_birthday_wish.s('xx'),
        )
    
    # 第二种配置方式 
    app.conf.beat_schedule = { 
        # 每 30s 发送一次生日祝福
        'add-every-30-seconds': {
            'task': 'proj.send_birthday_wish',
            'schedule': 10.0,
            'args': ('liuzhichao', )
        },
        # 每年 7月17号 9点30分 发送生日祝福
        'add-every-year':  {
            'task': 'proj.send_birthday_wish',
            'schedule': crontab(
                minute=30, hour=9, day_of_month=17, month_of_year=7
            ),
            'args': ('liuzhichao', )
        }
    }
    

    C:启动定时任务

    # 启动定时任务服务
    $ celery -A proj beat
    OR
    # 可以通过激活 workers -B 选项将 beat 嵌入到 worker 中,如果你永远不会运行一个以上的 worker 节点,这是很方便的,但是它不常用,因此不推荐在生产中使用。
    $ celery -A proj worker -B
    

    更多的属性(HIGH API)

    eta & countdown & expires

    retry & ignore_result

    组操作

    更多参考

    请点击这里

    任务派发的三种方式

    框架集成

    django及定时任务的配置

    其它框架集成

    高可用

    celery 进程挂掉

  • 相关阅读:
    Python命名空间的本质
    Jetty架构解析及应用示例
    PRML读书会第一章 Introduction(机器学习基本概念、学习理论、模型选择、维灾等)
    PRML读书会第三章 Linear Models for Regression(线性基函数模型、正则化方法、贝叶斯线性回归等)
    PRML读书会第二章 Probability Distributions(贝塔二项式、狄利克雷多项式共轭、高斯分布、指数族等)
    PRML读书会第四章 Linear Models for Classification(贝叶斯marginalization、Fisher线性判别、感知机、概率生成和判别模型、逻辑回归)
    Mac技巧合集第二期
    Mac技巧合集第三期
    C/Java/Python/ObjectiveC在OS X上的性能实验
    程序猿崛起——Growth Hacker
  • 原文地址:https://www.cnblogs.com/leguan1314/p/11909890.html
Copyright © 2011-2022 走看看