Task
task 具有如下特点:
-
task 可以在任何可调用的地方创建。它有双重角色:
- 定义了当task被调用时,会发送一个消息。
- 定义了当worker收到消息时会运行消息对应的函数
-
每个task都有一个唯一的名字。因此worker才会找到其对应的函数。
-
每个task消息都会保存在queue中,直到收到从worker收来的 acknowledged。如果worker因为某些原因死亡了,那么该task消息会被再投递到其它的worker
-
task不能保证每个任务的幂等性,因此,默认情况下,woker会先回复 acknowledged,然后再执行。因此在确保task幂等的情况下,可以使用
task_late
参数,使worker在执行完task之后在回复 acknowledged。 -
在如下情况下,woker将会发送 acknowledged消息,即使子进程已经被杀死:
- 我们不希望task重新运行,因此通过信号/命令等强制核心杀死task进程
- 系统管理员故意杀死进程,并且不希望它重新运行
- task需要过得的内存,可能导致核心OOM杀手
- task即使被重分发了依旧总是失败,产生高频率的消息循环,导致系统失败。
如果在上述情况中依旧希望task被重新分发,那么可以使用
task_reject_on_worker_lost
进行配置。
基础
创建task
@app.task
def function():
return 123
多层装饰器
多个装饰器要确保 @task
在最上层(最外层)
@app.task
@decorator1
@decorator2
def function(x, y):
return x + y
绑定task
绑定task的意思是,task的第一个参数使 task实例本身。
@app.task
def function(self, x, y):
return self.request.id
自定义Task类
通过 @task
的 base
参数指定task实现的Task类
@app.task(base=MyTask)
def function(x, y):
return 123
task名
每个task都有一个唯一的名字。可以通过name
参数来给task命名。
@app.task(name='hh')
def function(x, y):
return x + y
如果没有给定task名字,那么task会自动生成一个名字。自动生成的名字由以下部分组成:
- 定义task的模块的名字
@task
装饰的函数的名字
# tasks.py
@app.task
def function(x, y):
return x + y
# view.py
from tasks import function
print(function.name)
# tasks.function
导入
关于task的导入有两个建议:
- 使用绝对导入
- 尽量动源头开始导入
例如:在Django中,有如下配置:INSTALLED_APPS = ['project.myapp']
:
from project.myapp.tasks import mytask # 这种写法比较好
mytask.name # 'project.myapp.tasks.mytask'
-------------------------------------------------------
from myapp.tasks import mytask # 这种不好,会导致worker注册表中task名字对应不上
mytask.name # 'myapp.tasks.mytask'
修改自定义名字
如果希望修改task自动生成的名字,可以通过覆盖app.gen_task_name()
来实现。
Request
app.Task.request 包含当前执行的task的相关信息
属性 | 信息 |
---|---|
id | task的id |
group | task的group的id |
chord | task 属于的chord的id |
correlation_id | |
args | 传入task函数的位置参数 |
kwargs | 传入task函数的关键字参数 |
origin | 发送该task的主机的名字 |
retries | task重试次数 |
is_eager | 是否在本地以普通函数的方式在本地执行,不通过worker |
eta | 任务开始的绝对时间(可能会与countdown,retry冲突) |
expires | 超时时间(任务超时会被取消) |
hostname | 执行该task的worker的主机名 |
delivery_info | 字典的方式存储 分发该任务的 exchange 和 routing key 信息 |
reply-to | 发送回复的目的地的队列的名字 |
called_directly | 如果task没有通过worker,而是在本地执行,那么该值为True。 |
timelimit | 一个元组(软,硬)限制task的活动时间 |
callbacks | task被成功执行后执行的函数 |
errback | task执行失败后执行的函数 |
chain | 反转一个task列表,形成一个链,列表的最后一个将会是当前task成功执行后的下一个要执行的task |
Retry
app.Task.retry()
可以用来重分发task。调用retry的时候,会重新分发一个task message,使用和之前一样的task-id,被发送到相同的queue。重分发也会更改task的状态
@app.task(bind=True)
def function(self, x, y):
try:
1/0
except ZeroDivisionError as e:
raise self.retry(exc=e)
app.Task.retry
会引发一个异常,因此,该函数之后的代码将不会被执行。
如果task配置了max_retries
属性,那么在超过重试次数之后,还会引发当前的异常(在本例中是ZeroDivisionError),但是如果有如下两种情况,就会引发其他的异常:
- 如果没有配置
exc
参数,那么就会引发MaxRetriesExceededError
- 指定引发其他异常,
self.retry(exc=IndexError())
countdown
定义一个时间——task再次被执行前等待的时间
@app.task(bind=True, default_retry_delay=30 * 60) # 等待30分钟,默认值3分钟
def function()
try:
1/0
except ZeroDivisionError as e:
raise self.retry(exc=e, countdown=60) # 等待60秒,将会覆盖default_retry_delay
autoretry_for
自动重试。 根据指定的错误,自动重试
@app.task(autoretry_for=(SomeError,))
def function():
....
retry_kwargs
如果想在 retry() 时传入自定义参数
@app.task(autoretry_for=(SomeError,),retry_kwargs={"max_retries":5})
def functon():
...
以上类似于
@app.task(bind=True)
def function():
try:
...
exception SomeError as e:
raise self.retry(exc=e, max_retries=5)
retry_backoff
指数退避; 在重试的过程中,面对冲突,指数退避,默认会把最大延迟限定在10分钟内
如果使用了该配置,那么Task 的 countdown将会被忽略
@app.task(autoretry_for=(SomeException,), retry_baskoff=True)
def function():
...
此配置既可以为 布尔值, 也可以是 数字。
-
为 布尔值
各次延迟重试时间依据如下规律长
(2^2) (2^1) (2^2) (2^3) (2^4) …
-
为 数字(假设为x)
各次延迟重试时间依据如下规律增长
(x imes 2^0) (x imes2^1) (x imes 2^2) (x imes 2^3) (x imes2^4) ….
retry_backoff_max
自动重试延迟最大时间
如果使用 retry_backoff ,那么可以通过该参数配置延迟重试的最大时间。单位为秒,默认为 60*10
retry_jetter
延迟重试过程中,加入随机抖动。默认为True
如果使用了该配置,那么retry_backoff 计算出的时间被视为最大延迟时间,实际重试时间为 0 到最大重试延迟间的随机数
Task其他配置
name
task 的注册名; 默认为所在model名+函数名
request
包含当前请求的相关信息
max_retries
最大重试次数,默认为 3, 如果设置为 None
那么将一直重试直至成功。
throws
预期错误,一个元组。元组中的错误,会被作为失败存储在后端,worker将不会其作为错误记录。且也不包括追踪回溯信息。
@app.task(throws=(ExpectedError, ))
def function():
...
default_retry_delay
默认的重试延迟时间,单位 秒 ,int, float 都行。默认是 60*3
rate_limit
设定该类型task在同一个worker被处理的速率,值为 int 或 float, 单位为 /s /m /h
默认设置为设置中的 task_default_rate_limit , 如果没有配置,那么表示没有限速。
time_limit
限定task的执行时间,如果超过该时间task仍未完成,那么执行此task的worker将被杀死,并自动产生一个新的worker
soft_time_limit
软限定,当该时间到达后,会引发一个SoftTimeLimitExceeded
异常。可以被捕获并处理。
ignore_result
不记录task的状态。如果使用该配置,那么将不能使用AsyncResult
检查task状态或者是获取task的返回值
store_errors_even_if_ignored
如果使用该配置,即使配置了 ignore_result
也会存储task执行中的错误。
serializer
序列化,可以是 json
pickle
yaml
或者是其他的自定义序列化反式
compression
压缩方式,可以是 gzip
bzip2
或者其他的自定义压缩方式
backend
后端,用来存储task的状态、结果
acks_late
task被执行完成之后,才会向队列发送 acknowledged
。
因为不确定任务是否为幂等的,所以,默认是在任务执行前发送 ack
track_started
在task开始执行的时候,在task的状态中增加 started
状态,默认是不记录该状态的。
该配置的全局性配置为 task_track_started
Task状态
Celery持续跟踪任务的状态。这其中既包括执行成功的task的结果,也包括失败task的错误信息
内置的状态如下
-
PENDING
任务的默认状态(即将执行,或者是 未知的任务)
-
STARTED
任务开始执行。默认不记录该状态
-
SUCCESS
任务被成功执行。结果存储在后端中
-
FAUILURE
任务失败,错误信息存储在后端中
-
RETRY
task 被重试
-
REVOKED
task 被取消
自定义task状态
使用 update_state()
更新任务的状态
@app.task(bind=True)
def function(self):
self.update_state(state='CHECKPOINT', meta={'error':1})
Ignore
使worker忽略该task,但该task仍旧会回复 ack 到celery,且该task将从队列中移除
Request 中的处理
after_return
task返回之后调用
on_failure
task失败后调用
on_retry
task retry之后执行
on_success
task成功之后执行
自定义Request类
class MyRequest(Request):
def on_timeout(self, soft, time):
super(MyRequest, self,).on_timeout(soft, time)
...
class MyTask(Task):
Request = MyRequest
@app.task(base=MyTask)
def function():
...