创建计划任务:
from celery import Celery
import time
my_task=Celery("task",broker="redis://:123456@127.0.0.1:6379",backend="redis://:123456@127.0.0.1:6379")
@my_task.task
def my_func1():
time.sleep(10)
return "任务1"
@my_task.task
def my_func2():
return "任务2"
@my_task.task
def my_func3():
return "任务"
调用方法执行指定的任务:
from s1 import my_func1
res=my_func1.delay()
print(res)
获取返回值中运行计划的ID
判断计划是否执行完成:
from celery.result import AsyncResult
from s1 import my_task
async_task=AsyncResult(id="48029f4f-769e-438b-ac97-e89cc0bb1157",app=my_task)
# result=async_task.get()
if async_task.successful():
result=async_task.get()
print(result+"OK!")
else:
print("任务还未执行完成!")
启动celery在命令行执行: Celery worker -A s1 -l INFO -P eventlet -c 6
-A:指定要执行的目录
-l: 指定要使用的打印日志级别
-p:指定使用eventlet插件 让高版本celery支持window平台
-c:指定可执行的计划数量