zoukankan      html  css  js  c++  java
  • celery (二) task调用

    调用 TASK

    基础

    task 的调用方式有三种:

    • 类似普通函数的调用方式, 通过 __calling__ 调用 ,类似 function()
    • 通过 apply_async() 调用,能接受较多的参数
    • 通过 delay() 调用 ,是apply_async 方法的快捷方法,可接受的参数较少
    task.delay(arg1, arg2, kwarg1=1, kwarg2=2)
    等同于
    task.apply_async(args=[arg1, arg2], kwargs={'kwarg':1, 'kwarg2':2})
    

    链接任务

    通过链接的方式,可以在一个任务执行完毕之后,执行另一个任务。

    add.apply_async(args=(2,2),link=add.s(6))
    

    当第一个task完成之后,task的结果会作为第二个函数参数的的一部分传入第二个task。

    上例第一个task结果为 4, 第二个task执行的是 (4 + 6)

    如果第一个task失败,那么第一个task的 id 会被传入到第二个task中

    @app.task
    def error_handler(uuid):
        result = AsyncResult(uuid)
        exc = result.get(propagate=False)
        print('Task {0} raised exception: {1!r}
    {2!r}'.format(
              uuid, exc, result.traceback))
    
    add.apply_async(args=(2), link=error_handler.s())
    

    当然,两个是可以同时调用的

    add.apply_async((2, 2), link=[add.s(16), error_handler21111.s()])
    

    追踪状态

    通过设置 on_message 回调函数,可以追踪 task 的状态变化

    @app.task(bind=True)
    def hello(self, a, b):
        time.sleep(1)
        self.update_state(state="PROGRESS", meta={'progress': 50})
        time.sleep(1)
        self.update_state(state="PROGRESS", meta={'progress': 90})
        time.sleep(1)
        return 'hello world: %i' % (a+b)
    
    def on_raw_message(body):
        print(body)
    
    r = hello.apply_async()
    print(r.get(on_message=on_raw_message, propagate=False))
    
    {'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
     'result': {'progress': 50},
     'children': [],
     'status': 'PROGRESS',
     'traceback': None}
    {'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
     'result': {'progress': 90},
     'children': [],
     'status': 'PROGRESS',
     'traceback': None}
    {'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
     'result': 'hello world: 10',
     'children': [],
     'status': 'SUCCESS',
     'traceback': None}
    hello world: 10
    

    ETA 和countdown 延迟执行

    ETA(预估到达时间)配置一个具体的时间,是一个时间对象,这个时间是相关task的最早的执行时间(也就是说,该任务实际执行时间,可能晚于该时间)。countdown是ETA的快捷方式,countdown 是相对(当前)时间,单位是 秒。它表示该任务会在多少秒之后执行。

    >>> from datetime import datetime, timedelta
    
    >>> tomorrow = datetime.utcnow() + timedelta(days=1)
    >>> add.apply_async((2, 2), eta=tomorrow) # 明天的当前时间执行
    
    >>> result = add.apply_async((2, 2), countdown=3)
    >>> result.get()  # 3秒后执行
    

    Expiration 任务保质期

    通过配置 expiration 参数给task设置一个 过期时间,来保证task的时效性。当worker收到一个过期的任务之后,会标记该任务为 revoked(取消)状态。expiration 既可以是相对时间(单位:秒),也可以是绝对时间(时间对象)

    add.apply_async((10, 10), expires=60)
    
    >>> from datetime import datetime, timedelta
    >>> add.apply_async((10, 10), kwargs,
    ...                 expires=datetime.now() + timedelta(days=1)
    

    重试机制

    celery会在连接失败的时候,自动尝试重新发送task。一般收到一个task,都会有一条 收到task的log信息。

    通过设置 retry=False 来禁用自动重试。当然也可以通过配置其他参数来,配置celery自动重试的策略。

    max_retries 最大重试次数

    默认为3,如果设置为 None,表示一直重试。如果超过重试次数依旧失败,会引发一个导致重试失败的异常。

    interval_start 重试等待时间

    在多久之后开始重试,默认为 0 ,即可以重试。但是为 秒

    interval_step 延迟重试 步长

    连续重试的时候,每次重试之后,其延迟时间都会加上该参数的值。 默认是 0.2 ,单位为 秒

    interval_max 重试延迟最大等待时间

    每次重试之间,最大等待时间。 默认是 0.2 , 单位为 秒

    add.apply_async((2, 2), retry=True, retry_policy={
        'max_retries': 3,
        'interval_start': 0,
        'interval_step': 0.2,
        'interval_max': 0.2,
    })
    # 最大重试次数为 3,第一次会在失败之后,立刻执行; 第二次会在第一次失败之后,等待0+0.2s执行;第三次会会 0+0.2(两次重试最大间隔为0.2,所以依旧是0.2,而不是0.4)。所以,三次重试一共耗时 0.2+0.2 = 0.4s
    

    连接失败,或者是无法建立连接的时候,celery会引发OperationalError 异常。但是如果配置了自动重试,那么该异常只会在重试次数耗尽之后,依旧无法建立连接的时候,才引发。

    >>> from celery.utils.log import get_logger
    >>> logger = get_logger(__name__)
    
    >>> try:
    ...     add.delay(2, 2)
    ... except add.OperationalError as exc:
    ...     logger.exception('Sending task raised: %r', exc)
    

    序列化

    在celery的客户端和worker之间发送消息的时候,需要对消息进行序列化。默认的序列化方式是 JSON,可以通过在 setting 中配置 task_serializer 来更改默认的序列化方式,当然可以对每个task分别设置序列化方式。支持的序列化方式有:JSON YAML PICKLE msgpck

    压缩

    celery 同样可以在传送消息的时候,对其进行压缩。压缩方式有:gzipbzip2

    有如下三种方式来配置压缩属性,按优先级分别为:

    • compression 调用task时,配置该参数
    • Task.compression 属性。配置自定义 Task类的属性
    • task_compression 在配置文件中配置 该属性
  • 相关阅读:
    Windows和linux(ubuntu)互传文件简便快捷的方法
    Monkey与MonkeyRunner之间的区别
    Monkeyrunner 简介及其环境搭建
    如何查看Android apk的包名?
    Android自动化测试如何获取坐标点?
    android自动化测试之Monkey--从参数讲解、脚本制作到实战技巧
    Docker 命令
    在新安装的Linux系统中,防火墙默认是被禁掉的,一般也没有配置过任何防火墙的策略,所有不存在/etc/sysconfig/iptables文件。
    CentOS7中使用iptables
    centos7下找不到iptables文件
  • 原文地址:https://www.cnblogs.com/jijizhazha/p/10012254.html
Copyright © 2011-2022 走看看