zoukankan      html  css  js  c++  java
  • Celery-定时任务

    1. Celery Beat是什么?

      celery beat是一个调度程序,它定期启动任务,然后由集群中的可用工作程序节点执行任务。

      默认情况下,条目是从beat_schedule设置中获取的,但也可以使用自定义存储,例如将条目存储在SQL数据库中。

      必须确保一次只有一个调度程序针对一个调度任务运行,否则最终将导致重复的任务。使用集中式方法意味着时间表不必同步,并且服务可以在不使用锁的情况下运行。

    2. 在celery beat中添加任务

       要定期调用任务,您必须在Beat时间表列表中添加一个条目

    tasks.py

    from celery import Celery
    from celery.schedules import crontab app
    = Celery('tasks', broker='pyamqp://celery:celery@192.168.0.12:5672/celery_vhost',backend='redis://localhost:6379/0') #app = Celery('tasks', backend='redis://localhost', broker='pyamqp://') app.conf.update( task_serializer='json', accept_content=['json'], # Ignore other content result_serializer='json', timezone='Asia/Shanghai', enable_utc=True, ) @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') # Calls add(2,2) every 30 seconds sender.add_periodic_task(30.0, add.s(2,2), expires=10) # Executes every Monday morning at 7:30 a.m. sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), test.s('Happy Mondays!'), ) @app.task def test(arg): print(arg) @app.task def add(x, y): return x + y

    3. 启用Celery Beat

    Beat需要将任务的最后运行时间存储在本地数据库文件(默认情况下命名为celerybeat-schedule)中,

    因此它需要访问权限才能在当前目录中进行写操作,或者可以为此文件指定一个自定义位置:

    celery -A tasks beat -s /var/run/celery/celerybeat-schedule

    然后在另一个终端启用worker 

    celery -A tasks worker -l info

    可以看见日志:

    [2019-10-24 14:45:53,448: INFO/ForkPoolWorker-4] Task tasks.add[e028900c-f2a3-468e-8cb8-4ae72d0e77fe] succeeded in 0.0020012762397527695s: 4
    [2019-10-24 14:46:03,370: INFO/MainProcess] Received task: tasks.test[0635b276-19c9-4d76-9941-dbe9e7320a7f]
    [2019-10-24 14:46:03,372: WARNING/ForkPoolWorker-6] hello
    [2019-10-24 14:46:03,374: INFO/ForkPoolWorker-6] Task tasks.test[0635b276-19c9-4d76-9941-dbe9e7320a7f] succeeded in 0.0021341098472476006s: None
    [2019-10-24 14:46:13,371: INFO/MainProcess] Received task: tasks.test[afcfa84c-3a3b-48bf-9191-59ea55b08eea]
    [2019-10-24 14:46:13,373: WARNING/ForkPoolWorker-8] hello
    [2019-10-24 14:46:13,375: INFO/ForkPoolWorker-8] Task tasks.test[afcfa84c-3a3b-48bf-9191-59ea55b08eea] succeeded in 0.002273786813020706s: None

    也可以通过启用workers  -B选项beat嵌入到worker中,

    如果永远不会运行一个以上的worker节点,这很方便,但是它并不常用,因此不建议用于生产环境:

    celery -A tasks worker -B -l info 
    

      

      

  • 相关阅读:
    pytest.mark.parametrize里面indirect参数详细解释
    linux环境安装python环境
    gitlab怎么给别人新增项目权限
    VMware虚拟机下的CentOS7如果Ping不通百度,解决办法
    ip configuration could not be reserved (no available address timeout etc.)虚拟机连接不上网卡解决办法
    虚拟机安装教程
    auto_now与auto_now_add之间的区别
    【二分答案】洛谷P2678 [NOIP2015 提高组] 跳石头/P1824 进击的奶牛/P2440木材加工/P1873 砍树
    团体程序设计天梯赛PTA L2-021点赞狂魔
    团体程序设计天梯赛PTA L2-020功夫传人
  • 原文地址:https://www.cnblogs.com/zydev/p/11732129.html
Copyright © 2011-2022 走看看