结合django的定时任务
应用场景:
1、异步执行,如耗时任务;
2、定时任务
broker:redis
执行任务:worker
celery_pro目录:
celeryf.py __init__.py config.py tasks1.py tasks2.py
celeryf.py
from __future__ import absolute_import,unicode_literals from celery import Celery app = Celery( main='mycelery', #主模块名 broker='redis://:lyb@localhost:6379/0', #消息代理 backend='redis://:lyb@localhost:6379/0', #结果存储 include=['celery_pro.tasks1','celery_pro.tasks2'], #定义worker的文件,不含.py ) app.config_from_object('celery_pro.config') #读取配置文件,可以是导入的模块,也可以是点分隔的文件字符串表示 if __name__ == '__main__': app.start()
__init__.py #空
config.py #http://docs.celeryproject.org/en/latest/userguide/configuration.html
import os from datetime import timedelta from celery.schedules import crontab BASE_DIR = os.path.abspath('') # PATH_TEST = 'path_test' CELERY_ENABLE_UTC = False # 不是用UTC CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务结果的时效时间 # CELERYD_LOG_FILE = BASE_DIR + "/log/celery/celery.log" # log路径 # CELERYBEAT_LOG_FILE = BASE_DIR + "/log/celery/beat.log" # beat log路径 # CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] # 允许接受的格式 一般采用msgpack(快)和json(跨平台) # CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果格式 # CELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化反序列化采用msgpack CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'celery_pro.tasks2.add_2', 'schedule': 30.0, 'args': (16, 16) }, 'add-every-10-seconds': { 'task': 'celery_pro.tasks1.test', 'schedule': 10.0, 'args': ['nihao'] }, # Executes every Monday morning at 7:30 a.m. 'add-every-monday-morning': { 'task': 'celery_pro.tasks2.add_2', 'schedule': crontab(day_of_week='monday'), 'args': (16, 16), }, }
tasks1.py
from __future__ import absolute_import,unicode_literals from celery.schedules import crontab from .celeryf import app #1、第一种形式 # @app.on_after_configure.connect # def setup_periodic_tasks(sender, **kwargs): # # Calls test('hello') every 10 seconds. # sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') # # # Calls test('world') every 30 seconds # sender.add_periodic_task(30.0, test.s('world'), expires=10) # # # Executes every Monday morning at 7:30 a.m. # sender.add_periodic_task( # crontab(hour=7, minute=30, day_of_week=1), # test.s('Happy Mondays!'), # ) #2、第二种形式 # app.conf.beat_schedule = { # 'add-every-30-seconds': { # 'task': 'celery_pro.tasks2.add_2', # 'schedule': 30.0, # 'args': (16, 16) # }, # 'add-every-10-seconds': { # 'task': 'celery_pro.tasks1.test', # 'schedule': 10.0, # 'args': ['nihao'] # }, # # Executes every Monday morning at 7:30 a.m. # 'add-every-monday-morning': { # 'task': 'celery_pro.tasks2.add_2', # 'schedule': crontab(), # 'args': (16, 16), # }, # } @app.task def test(arg): print(arg)
tasks2.py
from __future__ import unicode_literals,absolute_import from .celery import app import time @app.task def add_2(x,y): print('x + y :') return x+y @app.task def mul(x,y): time.sleep(20) return time.time()
启动worker:
(venv) l@l:~/myfile/pycharm/py3$ python celery_pro/celeryf.py worker -l debug
启动celery beat即调度器:
celery -A celery_pro.tasks1 beat -l debug