zoukankan      html  css  js  c++  java
  • Celery配置实践笔记

    说点什么:

    整理下工作中配置celery的一些实践,写在这里,一方面是备忘,另外一方面是整理成文档给其他同事使用。

    演示用的项目,同时也发布在Github上: https://github.com/blackmatrix7/celery-demo

    这份笔记会随着经验的积累,逐步调整完善,不过通常情况下,Github上的更新会比较快。


    Celery 配置实践笔记

    Celery 配置实践笔记,目前已记录:

    • 异步执行任务
    • 为不同任务分配不同的队列
    • 计划任务

    需要补充:

    • 为不同的任务配置不同的优先级
    • Celey任务的返回结果

    创建Celery

    配置Celery参数

    在创建celery实例之前,需要对celery的参数进行一些配置。

    在这里列出一些比较常用的Celery配置项:

    配置项名称 说明
    CELERY_DEFAULT_QUEUE 默认的队列名称,当没有为task特别指定队列时,采用此队列
    CELERY_BROKER_URL 消息代理,用于发布者传递消息给消费者,推荐RabbitMQ
    CELERY_RESULT_BACKEND 后端,用于存储任务执行结果,推荐redis
    CELERY_TASK_SERIALIZER 任务的序列化方式
    CELERY_RESULT_SERIALIZER 任务执行结果的序列化方式
    CELERY_ACCEPT_CONTENT
    CELERYD_CONCURRENCY 任务消费者的并发数
    CELERY_TIMEZONE 时区设置,计划任务需要,推荐 Asia/Shanghai
    CELERY_QUEUES Celery队列设定
    CELERY_ROUTES Celery路由设定,用于给不同的任务指派不同的队列
    CELERYBEAT_SCHEDULE Celery计划任务设定

    为了使用方便,在这里引入了之前写的config模块(支持本地配置不提交到github),本身是非必须的,用dict存储celery配置一样可以。所以在下面的示例代码中,会以注释的形式,演示不使用config模块实现celery配置。

    # 等价于直接定义一个broker,如
    # broker = 'amqp://user:password@127.0.0.1:5672//'
    broker = current_config.CELERY_BROKER_URL
    # 将config模块的配置转换成dict,等价于直接通过dict定义一组celery配置
    celery_conf = {k: v for k, v in current_config.items()}
    

    创建celery实例

    将之前创建的broker和celery_conf传递给celery,用于创建celery实例。

    manage.py

    # 创建celery实例
    celery = Celery('demo',  broker=broker)
    # 载入celery配置
    celery.config_from_object(celery_conf)
    

    定义模拟函数

    在项目的handlers下,定义一些用来异步任务或计划任务的调用的函数。

    这里定义了两个py文件,分别是async_tasks.pyschedules.py,前者用于异步任务,后者用于计划任务。

    异步任务下有两个函数,模拟发送邮件和推送消息;计划任务下有数个根据计划任务执行事件定义的模拟函数。

    创建celery worker

    推荐以celery.start的方式来启动celery

    启动参数

    在创建完celery实例后,调用celery实例的start方法,来启动celery。

    在demo中,在manage.py中创建celery实例。

    celery = Celery('demo',  broker=broker)
    celery.config_from_object(celery_conf)
    # argv中传入启动celery所需的参数
    celery.start(argv=['celery', 'worker', '-l', 'info', '-f', 'logs/celery.log'])
    

    第一个参数是固定的,用于启动celery

    第二个参数是启动的celery组件,这里启动的是worker,用于执行任务

    第三个参数和第四个参数为一组,指定日志的级别,这里记录级别为info的日子

    第五个参数和第六个参数为一组,指定日志文件的位置,这里将日志记录在log/celery.log

    指定消费的队列

    在启动命令中,增加一个-Q的参数,用于指定消费的队列。

    celery.start(argv=['celery', 'worker', '-Q', 'message_queue', '-l', 'info', '-f', 'logs/message.log'])
    

    上述的参数中,'-Q', 'message_queue'两个参数,是指定这个worker消费名为“message_queue”的队列。

    创建异步任务

    对于需要在celery中异步执行的函数,只需要在函数上增加一个装饰器。

    # 导入manage.py中创建的celery实例
    from manage import celery
    
    # 需要在celery中异步执行的函数,增加celery.task装饰器
    @celery.task
    def async_send_email(send_from, send_to, subject, content):
        """
        忽略方法实现
        """
        pass
    

    异步执行任务

    对于以celery.task装饰的函数,可以同步运行,也可以异步运行。

    接上一个例子的async_send_email函数。

    同步执行时,直接调用这个函数即可。

    async_push_message(send_to=['张三', '李四'], content='老板喊你来背锅了')
    async_push_message(send_to=['刘五', '赵六'], content='老板喊你来领奖金了')
    

    需要异步执行时,调用函数的delay()方法执行,此时会将任务委托给celery后台的worker执行。

    async_push_message.delay(send_to=['张三', '李四'], content='老板喊你来背锅了')
    async_push_message.delay(send_to=['刘五', '赵六'], content='老板喊你来领奖金了')
    

    为任务指定不同队列

    其实在之前配置celery参数时,就已经实现为不同任务指定不同队列的目标。

    在这里详细说明下配置项:

    # 导入Queue
    from kombu import Queue
    # 导入Task所在的模块,所有使用celery.task装饰器装饰过的函数,所需要把所在的模块导入
    # 我们之前创建的几个测试用函数,都在handlers.async_tasks和handlers.schedules中
    # 所以在这里需要导入这两个模块,以str表示模块的位置,模块组成tuple后赋值给CELERY_IMPORTS
    # 这样Celery在启动时,会自动找到这些模块,并导入模块内的task
    CELERY_IMPORTS = ('handlers.async_tasks', 'handlers.schedules')
    # 为Celery设定多个队列,CELERY_QUEUES是个tuple,每个tuple的元素都是由一个Queue的实例组成
    # 创建Queue的实例时,传入name和routing_key,name即队列名称
    CELERY_QUEUES = (
        Queue(name='email_queue', routing_key='email_router'),
        Queue(name='message_queue', routing_key='message_router'),
        Queue(name='schedules_queue', routing_key='schedules_router'),
    )
    # 最后,为不同的task指派不同的队列
    # 将所有的task组成dict,key为task的名称,即task所在的模块,及函数名
    # 如async_send_email所在的模块为handlers.async_tasks
    # 那么task名称就是handlers.async_tasks.async_send_email
    # 每个task的value值也是为dict,设定需要指派的队列name,及对应的routing_key
    # 这里的name和routing_key需要和CELERY_QUEUES设定的完全一致
    CELERY_ROUTES = {
        'handlers.async_tasks.async_send_email': {
            'queue': 'email_queue',
            'routing_key': 'vcan.email_router',
        },
        'handlers.async_tasks.async_push_message': {
            'queue': 'message_queue',
            'routing_key': 'message_router',
        },
        'handlers.schedules.every_30_seconds': {
            'queue': 'schedules_queue',
            'routing_key': 'schedules_router',
        }
    }
    

    配置完成后,不同的task会根据CELERY_ROUTES的设置,指派到不同的消息队列。

    计划任务

    celery除用于异步执行任务外,还可以用于执行计划任务。

    设定celery时区

    在开始配置计划任务之前对配置文件中设定celery时区。

    # 设定 Celery 时区
    CELERY_TIMEZONE = 'Asia/Shanghai'
    

    定义计划任务

    在配置文件中定义计划任务

    计划任务也是异步执行任务的一种方式,所以也需要参考之前的说明,为计划任务分配不同的队列。

    在配置文件中,新增一项CELERYBEAT_SCHEDULE的配置

    from datetime import timedelta
    from celery.schedules import crontab
    
    CELERYBEAT_SCHEDULE = {
    	# 给计划任务取一个独一无二的名字吧
        'every-30-seconds': {
        	# task就是需要执行计划任务的函数
             'task': 'handlers.schedules.every_30_seconds',
             # 配置计划任务的执行时间,这里是每30秒执行一次
             'schedule': timedelta(seconds=30),
             # 传入给计划任务函数的参数
             'args': {'value': '2333333'}
        }
    }
    

    CELERYBEAT_SCHEDULE是一个dict,每个key为计划任务的名称,value也是dict,包含task、schedule、args。

    task即需要执行计划任务的函数,这里是handlers.schedules.every_30_seconds,即handlers的schedules的every_30_seconds函数,如需配置其他的函数,依照此规则定义即可。

    args是传递给计划任务函数的参数,在这个例子中,即传递给every_30_seconds的参数,如果无需参数,则args配置为None。

    schedule即配置计划任务的执行时间,例子中使用的是timedelta实例,用于实现固定间隔某些时间执行。除此之外,还可以设定某个时间点执行,或重复某个时间点执行。这个就需要用到celery的crontab类。

    'push_occupancy_rates': {
                'task': 'handlers.schedules.test_func_b',
                # 每天中午12点执行
                'schedule': crontab(hour='12', minute='0'),
                'args': None
            },
    

    关于crontab更详细的配置方式,可以参考官方手册

    Example Meaning
    crontab() Execute every minute.
    crontab(minute=0, hour=0) Execute daily at midnight.
    crontab(minute=0, hour='*/3') Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
    crontab(minute=0, hour='0,3,6,9,12,15,18,21') Same as previous.
    crontab(minute='*/15') Execute every 15 minutes.
    crontab(day_of_week='sunday') Execute every minute (!) at Sundays.
    crontab(minute='', hour='', day_of_week='sun') Same as previous.
    crontab(minute='*/10', hour='3,17,22', day_of_week='thu,fri') Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays.
    crontab(minute=0,hour='/2,/3') Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
    crontab(minute=0, hour='*/5') Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5).
    crontab(minute=0,hour='*/3,8-17') Execute every hour divisible by 3, and every hour during office hours (8am-5pm).
    crontab(0, 0,day_of_month='2') Execute on the second day of every month.
    crontab(0, 0, day_of_month='2-30/3') Execute on every even numbered day.
    crontab(0, 0, day_of_month='1-7,15-21') Execute on the first and third weeks of the month.
    crontab(0, 0,day_of_month='11', month_of_year='5') Execute on the eleventh of May every year.
    crontab(0, 0, month_of_year='*/3') Execute on the first month of every quarter.
  • 相关阅读:
    Postman使用教程
    CAD和ArcGIS转换 矢量配准
    SAP CRM Advanced search和Simple search里Max hit表现行为的差异
    SAP CRM Product simple search的启用步骤
    如何快速定位SAP CRM订单应用(Order Application)错误消息抛出的准确位置
    如何动态修改SAP CRM WebClient UI表格栏的宽度
    如何在SAP CRM WebClient UI里创建web service并使用ABAP消费
    如何处理SAP CRM Web Service错误
    如何使用SAP CRM WebClient UI实现一个类似新浪微博的字数统计器
    如何开启SAP CRM基于WORD模板创建附件的功能
  • 原文地址:https://www.cnblogs.com/blackmatrix/p/8950381.html
Copyright © 2011-2022 走看看