一 . 异步
消费者文件
from celery import Celery
# 消费者, 生产者生产的放在broker里,返回值可以放在backend(可不写)
# 这两个容器可以是rabbitmq也可以是redis
app = Celery('task', broker='redis://192.168.111.129:6379/6',
backend='redis://192.168.111.129:6379/5')
@app.task
def myfunc1():
return '我是func1'
# celery worker -A asyn -l info -P eventlet(在Terminal中输入这个指令,相当于run)
生产者文件
from asyn import myfunc1, app
from celery.result import AsyncResult
# 生产者
for i in range(10):
# 开始生产
producer = myfunc1.delay()
print(producer)
# 开始生产
producer = myfunc1.delay()
res = AsyncResult(id=producer.id, app=app)
# 获取返回值, 前提是消费者那里写了backup
print(producer.get())
# 查看状态
print(producer.status)
# 判断是否成功,返回布尔值
print(producer.successful())
# 右键run执行
二 . 延时任务
消费者文件(同上)
生产者文件
from asyn import myfunc1
# 延时任务
producer = myfunc1.apply_async(countdown=5) # 倒计时5秒
# 右键run运行
三 . 周期性任务
消费者文件(同上)
生产者文件
from asyn import app
from celery.beat import crontab
app.conf.beat_schedule = { # 必须是这样写
'cycleTask': {
'task': 'asyn.myfunc1', # asyn文件中的myfunc1函数
'schedule': 5, # 每隔时间执行一下
# args 如果需要传参的话可以传
},
'crontab': {
'task': 'asyn.myfunc1', # asyn文件中的myfunc1函数
'schedule': crontab(minute=20), # 分钟为20的时候执行
# args 如果需要传参的话可以传
}
}
# 这个文件中不能右键run了,也在Terminal中执行下面的指令
# celery beat -A cycle_task -l info # 这个 -A 后面写的是生产者所在文件名
四 . 生产者消费者放到文件下的正确执行方法