Celery
官方
Celery 官网:http://www.celeryproject.org/
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统
专注于实时处理的异步任务队列
同时也支持任务调度
注意:
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
Celery异步任务框架
"""
1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
人是一个独立运行的服务 | 医院也是一个独立运行的服务
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
"""
Celery架构
Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。
消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
使用场景
异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
延迟执行:解决延迟任务
定时执行:解决周期(周期)任务,比如每天数据统计
Celery的安装配置
pip install celery
消息中间件:RabbitMQ/Redis
app=Celery(‘任务名’, broker=’xxx’, backend=’xxx’)
两种celery任务结构:提倡用包管理,结构更清晰
# 如果 Celery对象:Celery(...) 是放在一个模块下的 # 1)终端切换到该模块所在文件夹位置:scripts # 2)执行启动worker的命令:celery worker -A 模块名 -l info -P eventlet # 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info # 注:模块名随意 # 如果 Celery对象:Celery(...) 是放在一个包下的 # 1)必须在这个包下建一个celery.py的文件,将Celery(...)产生对象的语句放在该文件中 # 2)执行启动worker的命令:celery worker -A 包名 -l info -P eventlet # 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info # 注:包名随意
Celery执行异步任务
基本结构
# 创建py文件:celery_app_task.py import celery import time # broker='redis://127.0.0.1:6379/2' 不加密码 backend='redis://:123456@127.0.0.1:6379/1' broker='redis://:123456@127.0.0.1:6379/2' cel=celery.Celery('test',backend=backend,broker=broker) @cel.task def add(x,y): return x+y
包架构封装(多任务结构)
project ├── celery_task # celery包 │ ├── __init__.py # 包文件 │ ├── celery.py # celery连接和配置相关文件,且名字必须叫celery.py │ └── tasks.py # 所有任务函数 ├── add_task.py # 添加任务 └── get_result.py # 获取结果
基本使用
# 1)创建app + 任务 # 2)启动celery(app)服务: # 非windows # 命令:celery worker -A celery_task -l info # windows: # pip3 install eventlet # celery worker -A celery_task -l info -P eventlet # 3)添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本 # 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本 from celery import Celery broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
from .celery import app import time @app.task def add(n, m): print(n) print(m) time.sleep(10) print('n+m的结果:%s' % (n + m)) return n + m @app.task def low(n, m): print(n) print(m) print('n-m的结果:%s' % (n - m)) return n - m
from celery_task import tasks # 添加立即执行任务 t1 = tasks.add.delay(10, 20) t2 = tasks.low.delay(100, 50) print(t1.id) # 添加延迟任务 from datetime import datetime, timedelta eta=datetime.utcnow() + timedelta(seconds=10) tasks.low.apply_async(args=(200, 50), eta=eta)
from celery_task.celery import app from celery.result import AsyncResult id = '01e57ace-03b2-483b-9056-8e7289537e07' if __name__ == '__main__': async_result=AsyncResult(id=id, app=app) if async_result.successful(): result = async_result.get() print(result) elif async_result.failed(): print('任务失败') elif async_result.status == 'PENDING': print('任务等待中被执行') elif async_result.status == 'RETRY': print('任务异常后正在重试') elif async_result.status == 'STARTED': print('任务已经开始被执行')
高级使用
# 1)创建app + 任务 # 2)启动celery(app)服务: # 非windows # 命令:celery worker -A celery_task -l info # windows: # pip3 install eventlet # celery worker -A celery_task -l info -P eventlet # 3)添加任务:自动添加任务,所以要启动一个添加任务的服务 # 命令:celery beat -A celery_task -l info # 4)获取结果 from celery import Celery broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery(broker=broker, backend=backend, include=['celery_task.tasks']) # 时区 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False # 任务的定时配置 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { 'low-task': { 'task': 'celery_task.tasks.low', 'schedule': timedelta(seconds=3), # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点 'args': (300, 150), } }
from .celery import app import time @app.task def add(n, m): print(n) print(m) time.sleep(10) print('n+m的结果:%s' % (n + m)) return n + m @app.task def low(n, m): print(n) print(m) print('n-m的结果:%s' % (n - m)) return n - m
from celery_task.celery import app from celery.result import AsyncResult id = '01e57ace-03b2-483b-9056-8e7289537e07' if __name__ == '__main__': async_result=AsyncResult(id=id, app=app) if async_result.successful(): result = async_result.get() print(result) elif async_result.failed(): print('任务失败') elif async_result.status == 'PENDING': print('任务等待中被执行') elif async_result.status == 'RETRY': print('任务异常后正在重试') elif async_result.status == 'STARTED': print('任务已经开始被执行')
django中使用
""" celery框架django项目工作流程 1)加载django配置环境 2)创建Celery框架对象app,配置broker和backend,得到的app就是worker 3)给worker对应的app添加可处理的任务函数,用include配置给worker的app 4)完成提供的任务的定时配置app.conf.beat_schedule 5)启动celery服务,运行worker,执行任务 6)启动beat服务,运行beat,添加任务 重点:由于采用了django的反射机制,使用celery.py所在的celery_task包必须放置项目的根目录下 """ # 一、加载django配置环境 import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev") # 二、加载celery配置环境 from celery import Celery # broker broker = 'redis://127.0.0.1:6379/0' # backend backend = 'redis://127.0.0.1:6379/1' # worker app = Celery(broker=broker, backend=backend, include=['celery_task.tasks']) # 时区 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False # 任务的定时配置 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { 'update-banner-list': { 'task': 'celery_task.tasks.update_banner_list', 'schedule': timedelta(seconds=10), 'args': (), } }
from .celery import app from django.core.cache import cache from home import models, serializers from django.conf import settings @app.task def update_banner_list(): queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:settings.BANNER_COUNT] banner_list = serializers.BannerSerializer(queryset, many=True).data # 拿不到request对象,所以头像的连接base_url要自己组装 for banner in banner_list: banner['image'] = 'http://127.0.0.1:8000%s' % banner['image'] cache.set('banner_list', banner_list, 86400) return True
一、什么是Celery
1、celery是什么
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
另外, Celery还支持不同的并发和序列化的手段
- 并发:Prefork, Eventlet, gevent, threads/single threaded
- 序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等
2、使用场景
celery是一个强大的 分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。
异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
定时任务:定时执行某件事情,比如每天数据统计
3、Celery具有以下优点
Simple(简单)
Celery 使用和维护都非常简单,并且不需要配置文件。
Highly Available(高可用)
woker和client会在网络连接丢失或者失败时,自动进行重试。并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用。
Fast(快速)
单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, 和优化设置时)
Flexible(灵活)
Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等。
4、Celery安装
你可以安装Celery通过Python包管理平台(PyPI)或者源码安装
使用pip安装:
1
|
$ pip install - U Celery |
或着:
1
|
$ sudo easy_install Celery |
二、Celery执行异步任务
1、基本使用
创建项目celerypro
创建异步任务执行文件celery_task:
import celery import time backend='redis://127.0.0.1:6379/1' broker='redis://127.0.0.1:6379/2' cel=celery.Celery('test',backend=backend,broker=broker) @cel.task def send_email(name): print("向%s发送邮件..."%name) time.sleep(5) print("向%s发送邮件完成"%name) return "ok"
创建执行任务文件,produce_task.py:
from celery_task import send_email result = send_email.delay("yuan") print(result.id) result2 = send_email.delay("alex") print(result2.id)
注意,异步任务文件命令执行:
1
|
celery worker - A celery_task - l info |
创建py文件:result.py,查看任务执行结果,
from celery.result import AsyncResult from celery_task import cel async_result=AsyncResult(id="c6ddd5b7-a662-4f0e-93d4-ab69ec2aea5d", app=cel) if async_result.successful(): result = async_result.get() print(result) # result.forget() # 将结果删除 elif async_result.failed(): print('执行失败') elif async_result.status == 'PENDING': print('任务等待中被执行') elif async_result.status == 'RETRY': print('任务异常后正在重试') elif async_result.status == 'STARTED': print('任务已经开始被执行')
2、多任务结构
celery.py:
from celery import Celery cel = Celery('celery_demo', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类 include=['celery_tasks.task01', 'celery_tasks.task02' ]) # 时区 cel.conf.timezone = 'Asia/Shanghai' # 是否使用UTC cel.conf.enable_utc = False
task01.py,task02.py:
#task01 import time from celery_tasks.celery import cel @cel.task def send_email(res): time.sleep(5) return "完成向%s发送邮件任务"%res #task02 import time from celery_tasks.celery import cel @cel.task def send_msg(name): time.sleep(5) return "完成向%s发送短信任务"%name
produce_task.py:
from celery_tasks.task01 import send_email from celery_tasks.task02 import send_msg # 立即告知celery去执行test_celery任务,并传入一个参数 result = send_email.delay('yuan') print(result.id) result = send_msg.delay('yuan') print(result.id)
check_result.py:
from celery.result import AsyncResult from celery_tasks.celery import cel async_result = AsyncResult(id="562834c6-e4be-46d2-908a-b102adbbf390", app=cel) if async_result.successful(): result = async_result.get() print(result) # result.forget() # 将结果删除,执行完成,结果不会自动删除 # async.revoke(terminate=True) # 无论现在是什么时候,都要终止 # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。 elif async_result.failed(): print('执行失败') elif async_result.status == 'PENDING': print('任务等待中被执行') elif async_result.status == 'RETRY': print('任务异常后正在重试') elif async_result.status == 'STARTED': print('任务已经开始被执行')
开启work:celery worker -A celery_tasks -l info -P eventlet,添加任务(执行produce_task.py),检查任务执行结果(执行check_result.py)
三、Celery执行定时任务
简单结构中:设定时间让celery执行一个定时任务,produce_task.py: 其他文件不变
from celery_task import send_email from datetime import datetime # 方式一 # v1 = datetime(2020, 3, 11, 16, 19, 00) # print(v1) # v2 = datetime.utcfromtimestamp(v1.timestamp()) # print(v2) # result = send_email.apply_async(args=["egon",], eta=v2) # print(result.id) # 方式二 ctime = datetime.now() # 默认用utc时间 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=10) task_time = utc_ctime + time_delay # 使用apply_async并设定时间 result = send_email.apply_async(args=["egon"], eta=task_time) print(result.id)
多任务结构中:celery.py修改如下: 不需要生产者produce_task.py直接用命令:celery beat -A celery_tasks
from datetime import timedelta from celery import Celery from celery.schedules import crontab cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[ 'celery_tasks.task01', 'celery_tasks.task02', ]) cel.conf.timezone = 'Asia/Shanghai' cel.conf.enable_utc = False cel.conf.beat_schedule = { # 名字随意命名 'add-every-10-seconds': { # 执行tasks1下的test_celery函数 'task': 'celery_tasks.task01.send_email', # 每隔2秒执行一次 # 'schedule': 1.0, # 'schedule': crontab(minute="*/1"), 'schedule': timedelta(seconds=6), # 传递参数 'args': ('张三',) }, # 'add-every-12-seconds': { # 'task': 'celery_tasks.task01.send_email', # 每年4月11号,8点42分执行 # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # 'args': ('张三',) # }, }
1
2
3
|
# 启动 Beat 程序$ celery beat -A celery_tasks # Celery Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列 # 之后启动 worker 进程.$ celery -A celery_tasks worker -l info 或者$ celery -B -A celery_tasks worker -l info |
celery -A celery_tasks worker -l info -c 5 # -c 5表示并发数5个
四、Django中使用celery
项目根目录创建celery包,目录结构如下:
mycelery/ ├── config.py ├── __init__.py ├── main.py └── sms/ ├── __init__.py ├── tasks.py
配置文件config.py:
broker_url = 'redis://127.0.0.1:6379/15' result_backend = 'redis://127.0.0.1:6379/14'
任务文件tasks.py:
# celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!! from mycelery.main import app import time import logging log = logging.getLogger("django") @app.task # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名 def send_sms(mobile): """发送短信""" print("向手机号%s发送短信成功!"%mobile) time.sleep(5) return "send_sms OK" @app.task # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名 def send_sms2(mobile): print("向手机号%s发送短信成功!" % mobile) time.sleep(5) return "send_sms2 OK"
最后在main.py主程序中对django的配置文件进行加载
# 主程序 import os from celery import Celery # 创建celery实例对象 app = Celery("sms") # 把celery和django进行组合,识别和加载django的配置文件 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryPros.settings.dev') # 通过app对象加载配置 app.config_from_object("mycelery.config") # 加载任务 # 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称 # app.autodiscover_tasks(["任务1","任务2"]) app.autodiscover_tasks(["mycelery.sms",]) # 启动Celery的命令 # 强烈建议切换目录到mycelery根目录下启动 # celery -A mycelery.main worker --loglevel=info
Django视图调用:
from django.shortcuts import render # Create your views here. from django.shortcuts import render,HttpResponse from mycelery.sms.tasks import send_sms,send_sms2 from datetime import timedelta from datetime import datetime def test(request): ################################# 异步任务 # 1. 声明一个和celery一模一样的任务函数,但是我们可以导包来解决 # send_sms.delay("110") # send_sms2.delay("119") # send_sms.delay() 如果调用的任务函数没有参数,则不需要填写任何内容 ################################# 定时任务 # ctime = datetime.now() # # 默认用utc时间 # utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) # time_delay = timedelta(seconds=10) # task_time = utc_ctime + time_delay # result = send_sms.apply_async(["911", ], eta=task_time) # print(result.id) return HttpResponse('ok')
基本结构
Celery是由Python开发的一个简单、灵活、可靠的处理大量任务的分发系统,它不仅支持实时处理也支持任务调度。
- user:用户程序,用于告知celery去执行一个任务。
- broker: 存放任务(依赖RabbitMQ或Redis,进行存储)
- worker:执行任务
celery需要rabbitMQ、Redis、Amazon SQS、Zookeeper(测试中) 充当broker来进行消息的接收,并且也支持多个broker和worker来实现高可用和分布式。http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
环境准备:
- 安装rabbitMQ或Redis
见:http://www.cnblogs.com/wupeiqi/articles/5132791.html - 安装celery
pip3 install celery
快速上手
import time from celery import Celery app = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379') @app.task def xxxxxx(x, y): time.sleep(10) return x + y
#!/usr/bin/env python # -*- coding:utf-8 -*- from s1 import xxxxxx # 立即告知celery去执行xxxxxx任务,并传入两个参数 result = xxxxxx.delay(4, 4) print(result.id)
from celery.result import AsyncResult from s1 import app async = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app) if async.successful(): result = async.get() print(result) # result.forget() # 将结果删除 elif async.failed(): print('执行失败') elif async.status == 'PENDING': print('任务等待中被执行') elif async.status == 'RETRY': print('任务异常后正在重试') elif async.status == 'STARTED': print('任务已经开始被执行')
执行 s1.py 创建worker(终端执行命令):
1
|
celery worker - A s1 - l info |
执行 s2.py ,创建一个任务并获取任务ID:
1
|
python3 s2.py |
执行 s3.py ,检查任务状态并获取结果:
1
|
python3 s3.py |
多任务结构
1
2
3
4
5
6
|
pro_cel ├── celery_tasks # celery相关文件夹 │ ├── celery.py # celery连接和配置相关文件 │ └── tasks.py # 所有任务函数 ├── check_result.py # 检查结果 └── send_task.py # 触发任务 |
#!/usr/bin/env python # -*- coding:utf-8 -*- from celery import Celery celery = Celery('xxxxxx', broker='redis://192.168.0.111:6379', backend='redis://192.168.0.111:6379', include=['celery_tasks.tasks']) # 时区 celery.conf.timezone = 'Asia/Shanghai' # 是否使用UTC celery.conf.enable_utc = False
#!/usr/bin/env python # -*- coding:utf-8 -*- import time from .celery import celery @celery.task def xxxxx(*args, **kwargs): time.sleep(5) return "任务结果" @celery.task def hhhhhh(*args, **kwargs): time.sleep(5) return "任务结果"
#!/usr/bin/env python # -*- coding:utf-8 -*- from celery.result import AsyncResult from celery_tasks.celery import celery async = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery) if async.successful(): result = async.get() print(result) # result.forget() # 将结果删除 elif async.failed(): print('执行失败') elif async.status == 'PENDING': print('任务等待中被执行') elif async.status == 'RETRY': print('任务异常后正在重试') elif async.status == 'STARTED': print('任务已经开始被执行')
#!/usr/bin/env python # -*- coding:utf-8 -*- import celery_tasks.tasks # 立即告知celery去执行xxxxxx任务,并传入两个参数 result = celery_tasks.tasks.xxxxx.delay(4, 4) print(result.id)
更多配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html
定时任务
1. 设定时间让celery执行一个任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import datetime from celery_tasks.tasks import xxxxx """ from datetime import datetime v1 = datetime(2017, 4, 11, 3, 0, 0) print(v1) v2 = datetime.utcfromtimestamp(v1.timestamp()) print(v2) """ ctime = datetime.datetime.now() utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp()) s10 = datetime.timedelta(seconds = 10 ) ctime_x = utc_ctime + s10 # 使用apply_async并设定时间 result = xxxxx.apply_async(args = [ 1 , 3 ], eta = ctime_x) print (result. id ) |
2. 类似于contab的定时任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
""" celery beat -A proj celery worker -A proj -l info """ from celery import Celery from celery.schedules import crontab app = Celery( 'tasks' , broker = 'amqp://47.98.134.86:5672' , backend = 'amqp://47.98.134.86:5672' , include = [ 'proj.s1' , ]) app.conf.timezone = 'Asia/Shanghai' app.conf.enable_utc = False app.conf.beat_schedule = { # 'add-every-10-seconds': { # 'task': 'proj.s1.add1', # 'schedule': 10.0, # 'args': (16, 16) # }, 'add-every-12-seconds' : { 'task' : 'proj.s1.add1' , 'schedule' : crontab(minute = 42 , hour = 8 , day_of_month = 11 , month_of_year = 4 ), 'args' : ( 16 , 16 ) }, } |
注:如果想要定时执行类似于crontab的任务,需要定制 Scheduler来完成。
Flask中应用Celery
1
2
3
4
5
|
pro_flask_celery / ├── app.py ├── celery_tasks ├── celery.py └── tasks.py |
#!/usr/bin/env python # -*- coding:utf-8 -*- from flask import Flask from celery.result import AsyncResult from celery_tasks import tasks from celery_tasks.celery import celery app = Flask(__name__) TASK_ID = None @app.route('/') def index(): global TASK_ID result = tasks.xxxxx.delay() # result = tasks.task.apply_async(args=[1, 3], eta=datetime(2018, 5, 19, 1, 24, 0)) TASK_ID = result.id return "任务已经提交" @app.route('/result') def result(): global TASK_ID result = AsyncResult(id=TASK_ID, app=celery) if result.ready(): return result.get() return "xxxx" if __name__ == '__main__': app.run()
#!/usr/bin/env python # -*- coding:utf-8 -*- from celery import Celery from celery.schedules import crontab celery = Celery('xxxxxx', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379', include=['celery_tasks.tasks']) # 时区 celery.conf.timezone = 'Asia/Shanghai' # 是否使用UTC celery.conf.enable_utc = False
#!/usr/bin/env python # -*- coding:utf-8 -*- import time from .celery import celery @celery.task def hello(*args, **kwargs): print('执行hello') return "hello" @celery.task def xxxxx(*args, **kwargs): print('执行xxxxx') return "xxxxx" @celery.task def hhhhhh(*args, **kwargs): time.sleep(5) return "任务结果"
Django中应用Celery
一、基本使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
django_celery_demo ├── app01 │ ├── __init__.py │ ├── admin.py │ ├── apps.py │ ├── migrations │ ├── models.py │ ├── tasks.py │ ├── tests.py │ └── views.py ├── db.sqlite3 ├── django_celery_demo │ ├── __init__.py │ ├── celery.py │ ├── settings.py │ ├── urls.py │ └── wsgi.py ├── manage.py ├── red.py └── templates |
#!/usr/bin/env python # -*- coding:utf-8 -*- import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_demo.settings') app = Celery('django_celery_demo') # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks()
from .celery import app as celery_app __all__ = ('celery_app',)
from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers)
... .... ..... # ######################## Celery配置 ######################## CELERY_BROKER_URL = 'redis://10.211.55.20:6379' CELERY_ACCEPT_CONTENT = ['json'] CELERY_RESULT_BACKEND = 'redis://10.211.55.20:6379' CELERY_TASK_SERIALIZER = 'json'
from django.shortcuts import render, HttpResponse from app01 import tasks from django_celery_demo import celery_app from celery.result import AsyncResult def index(request): result = tasks.add.delay(1, 8) print(result) return HttpResponse('...') def check(request): task_id = request.GET.get('task') async = AsyncResult(id=task_id, app=celery_app) if async.successful(): data = async.get() print('成功', data) else: print('任务等待中被执行') return HttpResponse('...')
"""django_celery_demo URL Configuration The `urlpatterns` list routes URLs to views. For more information please see: https://docs.djangoproject.com/en/1.11/topics/http/urls/ Examples: Function views 1. Add an import: from my_app import views 2. Add a URL to urlpatterns: url(r'^$', views.home, name='home') Class-based views 1. Add an import: from other_app.views import Home 2. Add a URL to urlpatterns: url(r'^$', Home.as_view(), name='home') Including another URLconf 1. Import the include() function: from django.conf.urls import url, include 2. Add a URL to urlpatterns: url(r'^blog/', include('blog.urls')) """ from django.conf.urls import url from django.contrib import admin from app01 import views urlpatterns = [ url(r'^admin/', admin.site.urls), url(r'^index/', views.index), url(r'^check/', views.check), ]
二、定时任务
1. 安装
1
|
install django - celery - beat |
2. 注册app
1
2
3
4
|
INSTALLED_APPS = ( ..., 'django_celery_beat' , ) |
3. 数据库去迁移生成定时任务相关表
1
|
python manage.py migrate |
4. 设置定时任务
- 方式一:代码中配置
#!/usr/bin/env python # -*- coding:utf-8 -*- import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_demo.settings') app = Celery('django_celery_demo') # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') app.conf.beat_schedule = { 'add-every-5-seconds': { 'task': 'app01.tasks.add', 'schedule': 5.0, 'args': (16, 16) }, } # Load task modules from all registered Django app configs. app.autodiscover_tasks()
- 方式二:数据表录入
5. 后台进程创建任务
1
|
celery - A django_celery_demo beat - l info - - scheduler django_celery_beat.schedulers:DatabaseScheduler |
6. 启动worker执行任务
1
|
celery - A django_celery_demo worker - l INFO |
官方参考:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery-with-django