zoukankan      html  css  js  c++  java
  • celery (二) task

    Task

    task 具有如下特点:

    1. task 可以在任何可调用的地方创建。它有双重角色:

      • 定义了当task被调用时,会发送一个消息。
      • 定义了当worker收到消息时会运行消息对应的函数
    2. 每个task都有一个唯一的名字。因此worker才会找到其对应的函数。

    3. 每个task消息都会保存在queue中,直到收到从worker收来的 acknowledged。如果worker因为某些原因死亡了,那么该task消息会被再投递到其它的worker

    4. task不能保证每个任务的幂等性,因此,默认情况下,woker会先回复 acknowledged,然后再执行。因此在确保task幂等的情况下,可以使用task_late 参数,使worker在执行完task之后在回复 acknowledged。

    5. 在如下情况下,woker将会发送 acknowledged消息,即使子进程已经被杀死:

      • 我们不希望task重新运行,因此通过信号/命令等强制核心杀死task进程
      • 系统管理员故意杀死进程,并且不希望它重新运行
      • task需要过得的内存,可能导致核心OOM杀手
      • task即使被重分发了依旧总是失败,产生高频率的消息循环,导致系统失败。

      如果在上述情况中依旧希望task被重新分发,那么可以使用 task_reject_on_worker_lost进行配置。

    基础

    创建task

    @app.task
    def function():
        return 123
    

    多层装饰器

    多个装饰器要确保 @task 在最上层(最外层)

    @app.task
    @decorator1
    @decorator2
    def function(x, y):
        return x + y
    

    绑定task

    绑定task的意思是,task的第一个参数使 task实例本身。

    @app.task
    def function(self, x, y):
        return self.request.id
    

    自定义Task类

    通过 @taskbase 参数指定task实现的Task类

    @app.task(base=MyTask)
    def function(x, y):
        return 123
    

    task名

    每个task都有一个唯一的名字。可以通过name 参数来给task命名。

    @app.task(name='hh')
    def function(x, y):
        return x + y
    

    如果没有给定task名字,那么task会自动生成一个名字。自动生成的名字由以下部分组成:

    1. 定义task的模块的名字
    2. @task 装饰的函数的名字
    # tasks.py
    
    @app.task
    def function(x, y):
        return x + y
    # view.py
    from tasks import function
    print(function.name)
    
    # tasks.function
    
    导入

    关于task的导入有两个建议:

    1. 使用绝对导入
    2. 尽量动源头开始导入

    例如:在Django中,有如下配置:INSTALLED_APPS = ['project.myapp']

    from project.myapp.tasks import mytask  # 这种写法比较好
    mytask.name   # 'project.myapp.tasks.mytask'
    -------------------------------------------------------
    from myapp.tasks import mytask  # 这种不好,会导致worker注册表中task名字对应不上
    mytask.name  # 'myapp.tasks.mytask'
    

    修改自定义名字

    如果希望修改task自动生成的名字,可以通过覆盖app.gen_task_name() 来实现。

    Request

    app.Task.request 包含当前执行的task的相关信息

    属性 信息
    id task的id
    group task的group的id
    chord task 属于的chord的id
    correlation_id
    args 传入task函数的位置参数
    kwargs 传入task函数的关键字参数
    origin 发送该task的主机的名字
    retries task重试次数
    is_eager 是否在本地以普通函数的方式在本地执行,不通过worker
    eta 任务开始的绝对时间(可能会与countdown,retry冲突)
    expires 超时时间(任务超时会被取消)
    hostname 执行该task的worker的主机名
    delivery_info 字典的方式存储 分发该任务的 exchange 和 routing key 信息
    reply-to 发送回复的目的地的队列的名字
    called_directly 如果task没有通过worker,而是在本地执行,那么该值为True。
    timelimit 一个元组(软,硬)限制task的活动时间
    callbacks task被成功执行后执行的函数
    errback task执行失败后执行的函数
    chain 反转一个task列表,形成一个链,列表的最后一个将会是当前task成功执行后的下一个要执行的task

    Retry

    app.Task.retry()可以用来重分发task。调用retry的时候,会重新分发一个task message,使用和之前一样的task-id,被发送到相同的queue。重分发也会更改task的状态

    @app.task(bind=True)
    def function(self, x, y):
        try:
            1/0
        except ZeroDivisionError as e:
            raise self.retry(exc=e)
    

    app.Task.retry 会引发一个异常,因此,该函数之后的代码将不会被执行。

    如果task配置了max_retries 属性,那么在超过重试次数之后,还会引发当前的异常(在本例中是ZeroDivisionError),但是如果有如下两种情况,就会引发其他的异常:

    • 如果没有配置 exc 参数,那么就会引发 MaxRetriesExceededError
    • 指定引发其他异常,self.retry(exc=IndexError())

    countdown

    定义一个时间——task再次被执行前等待的时间

    @app.task(bind=True, default_retry_delay=30 * 60)  # 等待30分钟,默认值3分钟
    def function()
    	try:
            1/0
        except ZeroDivisionError as e:
            raise self.retry(exc=e, countdown=60)  # 等待60秒,将会覆盖default_retry_delay
    

    autoretry_for

    自动重试。 根据指定的错误,自动重试

    @app.task(autoretry_for=(SomeError,))
    def function():
        ....
    

    retry_kwargs

    如果想在 retry() 时传入自定义参数

    @app.task(autoretry_for=(SomeError,),retry_kwargs={"max_retries":5})
    def functon():
        ...
    

    以上类似于

    @app.task(bind=True)
    def function():
        try:
            ...
        exception SomeError as e:
            raise self.retry(exc=e, max_retries=5)
    

    retry_backoff

    指数退避; 在重试的过程中,面对冲突,指数退避,默认会把最大延迟限定在10分钟内

    如果使用了该配置,那么Task 的 countdown将会被忽略

    @app.task(autoretry_for=(SomeException,), retry_baskoff=True)
    def function():
        ...
    

    此配置既可以为 布尔值, 也可以是 数字

    • 为 布尔值

      各次延迟重试时间依据如下规律长

      (2^2​) (2^1​) (2^2​) (2^3​) (2^4​)

    • 为 数字(假设为x)

      各次延迟重试时间依据如下规律增长

      (x imes 2^0) (x imes2^1) (x imes 2^2) (x imes 2^3) (x imes2^4) ….

    retry_backoff_max

    自动重试延迟最大时间

    如果使用 retry_backoff ,那么可以通过该参数配置延迟重试的最大时间。单位为秒,默认为 60*10

    retry_jetter

    延迟重试过程中,加入随机抖动。默认为True

    如果使用了该配置,那么retry_backoff 计算出的时间被视为最大延迟时间,实际重试时间为 0 到最大重试延迟间的随机数

    Task其他配置

    name

    task 的注册名; 默认为所在model名+函数名

    request

    包含当前请求的相关信息

    max_retries

    最大重试次数,默认为 3, 如果设置为 None 那么将一直重试直至成功。

    throws

    预期错误,一个元组。元组中的错误,会被作为失败存储在后端,worker将不会其作为错误记录。且也不包括追踪回溯信息。

    @app.task(throws=(ExpectedError, ))
    def function():
        ...
    

    default_retry_delay

    默认的重试延迟时间,单位 intfloat 都行。默认是 60*3

    rate_limit

    设定该类型task在同一个worker被处理的速率,值为 intfloat, 单位为 /s /m /h

    默认设置为设置中的 task_default_rate_limit , 如果没有配置,那么表示没有限速。

    time_limit

    限定task的执行时间,如果超过该时间task仍未完成,那么执行此task的worker将被杀死,并自动产生一个新的worker

    soft_time_limit

    软限定,当该时间到达后,会引发一个SoftTimeLimitExceeded 异常。可以被捕获并处理。

    ignore_result

    不记录task的状态。如果使用该配置,那么将不能使用AsyncResult 检查task状态或者是获取task的返回值

    store_errors_even_if_ignored

    如果使用该配置,即使配置了 ignore_result 也会存储task执行中的错误。

    serializer

    序列化,可以是 json pickle yaml 或者是其他的自定义序列化反式

    compression

    压缩方式,可以是 gzip bzip2 或者其他的自定义压缩方式

    backend

    后端,用来存储task的状态、结果

    acks_late

    task被执行完成之后,才会向队列发送 acknowledged

    因为不确定任务是否为幂等的,所以,默认是在任务执行前发送 ack

    track_started

    在task开始执行的时候,在task的状态中增加 started 状态,默认是不记录该状态的。

    该配置的全局性配置为 task_track_started

    Task状态

    Celery持续跟踪任务的状态。这其中既包括执行成功的task的结果,也包括失败task的错误信息

    内置的状态如下

    • PENDING

      任务的默认状态(即将执行,或者是 未知的任务)

    • STARTED

      任务开始执行。默认不记录该状态

    • SUCCESS

      任务被成功执行。结果存储在后端中

    • FAUILURE

      任务失败,错误信息存储在后端中

    • RETRY

      task 被重试

    • REVOKED

      task 被取消

    自定义task状态

    使用 update_state() 更新任务的状态

    @app.task(bind=True)
    def function(self):
        self.update_state(state='CHECKPOINT', meta={'error':1})
    

    Ignore

    使worker忽略该task,但该task仍旧会回复 ack 到celery,且该task将从队列中移除

    Request 中的处理

    after_return

    task返回之后调用

    on_failure

    task失败后调用

    on_retry

    task retry之后执行

    on_success

    task成功之后执行

    自定义Request类

    class MyRequest(Request):
        def on_timeout(self, soft, time):
            super(MyRequest, self,).on_timeout(soft, time)
            ...
    class MyTask(Task):
        Request = MyRequest
    
    @app.task(base=MyTask)
    def function():
        ...
    
  • 相关阅读:
    linux基础
    sublime、Typora
    Windows cmd命令
    idea打包java可执行jar包
    idea常用快捷键
    Linux入门学习笔记1:VI常用命令
    442. Find All Duplicates in an Array
    566. Reshape the Matrix
    766. Toeplitz Matrix
    561. Array Partition I
  • 原文地址:https://www.cnblogs.com/jijizhazha/p/9902054.html
Copyright © 2011-2022 走看看