1.获取结果信息
res = add.delay(2, 2) res.get(timeout=1) #4
res.id #d6b3aea2-fb9b-4ebc-8da4-848818db9114
res.failed() #False
res.successful() #True
res.state # SUCCESS
其它:
res.children[0].get() #有子任务时,可获取结果
list(res.collect()) #[(<AsyncResult: 7b720856-dc5f-4415-9134-5c89def5664e>, 4), (<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>, 64)]
2.signature
有时可能希望将任务调用的签名传递给另一个进程,或者作为另一个函数的参数,Celery为此使用了一种称为签名的东西。
签名包装了单个任务调用的参数和执行选项。
签名支持调用API:sig.apply_async(args=(), kwargs={}, **options)
定义签名:
add.signature((2, 2), countdown=10)
或者
add.s(2, 2)
示例1:
s1 = add.s(2, 2)
res = s1.delay()
res.get() #4
示例2:
s2 = add.s(2)
res = s2.delay(8)
res.get() #10
3.group
from celery import group
from proj.tasks import add
示例1:
group(add.s(i, i) for i in range(10))().get() #[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
示例2:
g = group(add.s(i) for i in range(10)) g(10).get() #[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
4.chain
任务可以链接在一起,在一个任务返回后,再另一个任务:
from celery import chain
from proj.tasks import add, mul
# (4 + 4) * 8
chain(add.s(4, 4) | mul.s(8))().get() # 64
或部分链:
# (? + 4) * 8
g = chain(add.s(4) | mul.s(8)) g(4).get() # 64
链也可以这样写:
(add.s(4, 4) | mul.s(8))().get() # 64
5.路由
Celery支持AMQP提供的所有路由功能,也支持将消息发送到命名队列的路由。
task_routes设置使您可以按名称路由任务,并将所有内容集中在一个位置:
app.conf.update(
task_routes = {
'proj.tasks.add': {'queue': 'hipri'},
},
)
还可以在运行时使用queue参数指定队列apply_async:
from proj.tasks import add
add.apply_async((2, 2), queue='hipri')
然后,通过指定选项来使工作线程从此队列中使用:
$ celery -A proj worker -Q hipri
可以使用逗号分隔的列表来指定多个队列。
$ celery -A proj worker -Q hipri,celery
6.遥控
可以在运行时控制和检查工作程序。
查看工作人员当前正在执行的任务:
$ celery -A proj inspect active
可以使用--destination选项指定一个或多个工作人员对请求执行操作。
$ celery -A proj inspect active --destination=celery@example.com
可以强制工作程序启用事件消息(用于监视任务和工作程序):
$ celery -A proj control enable_events
启用事件后,您可以启动事件转储程序以查看工作程序在做什么:
$ celery -A proj events --dump
或者启动curses界面:
$ celery -A proj events
完成监视后,可以再次禁用事件:
$ celery -A proj control disable_events
7.Link回调
@app.task
def error_handler(uuid):
result = AsyncResult(uuid)
exc = result.get(propagate=False)
print('Task {0}
raised exception: {1!r}
{2!r}'.format( uuid, exc, result.traceback))
可以使用link_error执行选项将其添加到任务中:
add.apply_async((2, 2), link_error=error_handler.s())
此外,link和link_error选项都可以表示为列表:
add.apply_async((2, 2), link=[add.s(16), other_task.s()])
然后将依次调用回调返回,使用父任务的返回值作为部分参数来调用所有回调。
可以将任意多个任务链接在一起,并且签名也可以链接:
s = add.s(2, 2) s.link(mul.s(4))
s.link(log_result.s())
8.on_message
Celery通过on_message回调来捕获所有状态的变化。
例如,对于长时间运行的任务以发送任务进度,您可以执行以下操作:
@app.task(bind=True) def hello(self, a, b): time.sleep(1) self.update_state(state="PROGRESS", meta={'progress': 50}) time.sleep(1) self.update_state(state="PROGRESS", meta={'progress': 90}) time.sleep(1) return 'hello world: %i' % (a+b) def on_raw_message(body): print(body) a, b = 1, 1 r = hello.apply_async(args=(a, b)) print(r.get(on_message=on_raw_message, propagate=False))
将生成如下输出:
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7', 'result': {'progress': 50}, 'children': [], 'status': 'PROGRESS', 'traceback': None} {'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7', 'result': {'progress': 90}, 'children': [], 'status': 'PROGRESS', 'traceback': None} {'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7', 'result': 'hello world: 10', 'children': [], 'status': 'SUCCESS', 'traceback': None} hello world: 10
9.eta和countdown
通过ETA(预计到达时间),可以设置特定的日期和时间,即最早执行任务的时间。
而countdown(倒计时)是一种以秒为单位设置ETA的快捷方式。
result = add.apply_async((2, 2), countdown=3)
result.get() # this takes at least 3 seconds to return
确保任务在指定的日期和时间之后的某个时间执行,但不一定在该确切时间执行。截止期限过长的可能原因可能包括许多项目在队列中等待,或者严重的网络延迟。
虽然countdown是整数,但eta必须是一个datetime 对象,并指定确切的日期和时间(包括毫秒精度和时区信息):
from datetime import datetime, timedelta
tomorrow = datetime.utcnow() + timedelta(days=1)
add.apply_async((2, 2), eta=tomorrow)
10.expires
add.apply_async((10, 10), expires=60) # Task expires after one minute
from now. from datetime import datetime, timedelta
add.apply_async((10, 10), kwargs, expires=datetime.now() + timedelta(days=1)
# Also supports datetime
当工人收到过期的任务时,它将任务标记为REVOKED