zoukankan      html  css  js  c++  java
  • celery定时任务

    1,celery介绍
    Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务( async task )和定时任务( crontab )。 异步任务比如是发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作 ,定时任务是需要在特定时间执行的任务。它的架构组成如下图:



    任务队列
    任务队列是一种跨线程、跨机器工作的一种机制.
    任务队列中包含称作任务的工作单元。有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理.

    任务模块
    包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。

    消息中间件 Broker
    Broker ,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。 Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。

    任务执行单元 Worker
    Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。

    任务结果存储 Backend
    Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, Redis 和 MongoDB 等。


    2.1 使用Celery实现异步任务

    a. 创建Celery实例
    b. 启动Celery Worker,通过delay()或者apply_async()将任务发布到broker
    c. 应用程序调用异步任务
    d. 存储结果
    Celery Beat: 任务调度器,Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列

    2.2 使用Celery定时任务

    a. 创建Celery实例
    b. 配置文件中配置任务,发送任务celery -A xxx beat
    c. 启动Celery Worker celery -A xxx worker -l info -P eventlet
    d. 存储结果

    3. 代码实现

    3.1 test1.py

    from .. import app
    import time
    
    def test11():
        time.sleep(1)
        print('test11')
    
    def test22():
        time.sleep(2)
        print('test22')
        test11()
    @app.task
    def test1_run():
      test11()
    test22()

    3.2 test2.py

    from .. import app
    import time
    
    def test33():
        time.sleep(3)
        print('test33')
    
    def test44():
        time.sleep(4)
        print('test44')
        test33()
    
    @app.task
    def test2_run():
        test33()
        test44() 

    3.3 celery_task.__init__.py

    # 拒绝隐式引入,如果celery.py和celery模块名字一样,避免冲突,需要加上这条语句
    # 该代码中,名字是不一样的,最好也要不一样
    from __future__ import absolute_import
    from celery import Celery
    
    app = Celery('tasks')
    app.config_from_object('celery_task.celeryconfig') 

    3.4 celeryconfig.py

    from __future__ import absolute_import
    from celery.schedules import crontab
    from datetime import timedelta
    
    # 使用redis存储任务队列
    broker_url = 'redis://127.0.0.1:6379/7'
    # 使用redis存储结果
    result_backend = 'redis://127.0.0.1:6379/8'
    
    task_serializer = 'json'
    result_serializer = 'json'
    accept_content = ['json']
    # 时区设置
    timezone = 'Asia/Shanghai'
    # celery默认开启自己的日志
    # False表示不关闭
    worker_hijack_root_logger = False
    # 存储结果过期时间,过期后自动删除
    # 单位为秒
    result_expires = 60 * 60 * 24
    
    # 导入任务所在文件
    imports = [
        'celery_task.app_scripts.test1',
        'celery_task.app_scripts.test2'
    ]
    
    # 需要执行任务的配置
    beat_schedule = {
        'test1': {
            # 具体需要执行的函数
            # 该函数必须要使用@app.task装饰
            'task': 'celery_task.app_scripts.test1.test1_run',
            # 定时时间
            # 每分钟执行一次,不能为小数
            'schedule': crontab(minute='*/1'),
            # 或者这么写,每小时执行一次
            # "schedule": crontab(minute=0, hour="*/1")
            # 执行的函数需要的参数
            'args': ()
        },
        'test2': {
            'task': 'celery_task.app_scripts.test2.test2_run',
            # 设置定时的时间,10秒一次
            'schedule': timedelta(seconds=10),
            'args': ()
        }
    }
    
    如果大写的话,需要写成:
     1 CELERYBEAT_SCHEDULE = {
     2     'celery_app.task.task1': {
     3         'task': 'celery_app.task.task1',
     4         'schedule': timedelta(seconds=20),
     5         'args': (1, 10)
     6     },
     7     'celery_app.task.task2': {
     8         'task': 'celery_app.task.task2',
     9         'schedule': crontab(minute='*/2'),
    10         'args': ()
    11     }
    12 }

    4. 执行定时任务

    4.1 发布任务

    在celery_task同级目录下,执行命令:
    celery -A celery_task beat

    4.2 执行任务

    在celery_task同级目录下,执行命令:
    celery -A celery_task worker -l info -P eventlet
    可以看到输出:

    [2018-09-07 16:54:57,809: WARNING/MainProcess] test33
    [2018-09-07 16:55:00,002: INFO/MainProcess] Received task: celery_task.app_scrip
    ts.test1.test1_run[0134cb52-29a3-4f57-890e-9730feac19e7]
    [2018-09-07 16:55:01,069: WARNING/MainProcess] test11
    [2018-09-07 16:55:01,821: WARNING/MainProcess] test44
    [2018-09-07 16:55:03,083: WARNING/MainProcess] test22
    [2018-09-07 16:55:04,234: WARNING/MainProcess] test11
    

    如果同时在<b>两个虚拟环境(服务器)</b>中都执行定时任务,都可以看到有以上LOG打印。

    4.3 celery相关命令

    发布任务
    celery -A celery_task beat
    执行任务
    celery -A celery_task worker -l info -P eventlet
    将以上两条合并
    celery -B -A celery_task worker
    后台启动celery worker进程
    celery multi start work_1 -A appcelery
    停止worker进程,如果无法停止,加上-A
    celery multi stop WORKNAME
    重启worker进程
    celery multi restart WORKNAME
    查看进程数
    celery status -A celery_task

    4.4 定时方式

    from celery.schedules import crontab
    from datetime import timedelta
    
    # 1 每10秒钟执行一次
    'schedule':timedelta(seconds=30)
    
    # 2 每分钟执行一次
    'schedule':crontab(minute='*/1') 
    

     
  • 相关阅读:
    dockerfile 详解
    kubectl简介
    关于高级事件的使用
    关于拖拽延伸出来的一些效果
    照片墙的制作过程及其出现的问题
    关于360度全景照片的问题总结
    实战演练-记账本App (五)
    人月神话阅读笔记02
    实战演练-记账本App(四)
    实战演练-记账本App(三)
  • 原文地址:https://www.cnblogs.com/kaishirenshi/p/12337211.html
Copyright © 2011-2022 走看看