zoukankan      html  css  js  c++  java
  • Celery--调用task

    1.获取结果信息
    res = add.delay(2, 2) res.get(timeout=1)   #4
    res.id    #d6b3aea2-fb9b-4ebc-8da4-848818db9114
    res.failed()   #False
    res.successful()   #True
    res.state   # SUCCESS 
     
    其它:
    res.children[0].get()   #有子任务时,可获取结果
    list(res.collect()) #[(<AsyncResult: 7b720856-dc5f-4415-9134-5c89def5664e>, 4), (<AsyncResult: 8c350acf-519d-4553-8a53-4ad3a5c5aeb4>, 64)]
     
    2.signature
    有时可能希望将任务调用的签名传递给另一个进程,或者作为另一个函数的参数,Celery为此使用了一种称为签名的东西。
    签名包装了单个任务调用的参数和执行选项。
    签名支持调用API:sig.apply_async(args=(), kwargs={}, **options)
    定义签名:
    add.signature((2, 2), countdown=10)
    或者
    add.s(2, 2)
     
    示例1:
    s1 = add.s(2, 2)
    res = s1.delay()
    res.get() #4
     
    示例2:
    s2 = add.s(2)
    res = s2.delay(8)
    res.get() #10
     
    3.group
    from celery import group
    from proj.tasks import add
    示例1:
    group(add.s(i, i) for i in range(10))().get()      #[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
     
    示例2:
    g = group(add.s(i) for i in range(10)) g(10).get()     #[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
     
    4.chain
    任务可以链接在一起,在一个任务返回后,再另一个任务:
    from celery import chain
    from proj.tasks import add, mul
     
    # (4 + 4) * 8
    chain(add.s(4, 4) | mul.s(8))().get()      # 64
     
    或部分链:
    # (? + 4) * 8 
    g = chain(add.s(4) | mul.s(8)) g(4).get()      # 64
     
    链也可以这样写:
    (add.s(4, 4) | mul.s(8))().get() # 64
     
    5.路由
    Celery支持AMQP提供的所有路由功能,也支持将消息发送到命名队列的路由。
    task_routes设置使您可以按名称路由任务,并将所有内容集中在一个位置:
    app.conf.update(
      task_routes = {
        'proj.tasks.add': {'queue': 'hipri'},
      },
    )
    还可以在运行时使用queue参数指定队列apply_async:
    from proj.tasks import add
    add.apply_async((2, 2), queue='hipri')
    然后,通过指定选项来使工作线程从此队列中使用:
    $ celery -A proj worker -Q hipri
    可以使用逗号分隔的列表来指定多个队列。
    $ celery -A proj worker -Q hipri,celery
     
    6.遥控
    可以在运行时控制和检查工作程序。
    查看工作人员当前正在执行的任务:
    $ celery -A proj inspect active
    可以使用--destination选项指定一个或多个工作人员对请求执行操作。
    $ celery -A proj inspect active --destination=celery@example.com
    可以强制工作程序启用事件消息(用于监视任务和工作程序):
    $ celery -A proj control enable_events
    启用事件后,您可以启动事件转储程序以查看工作程序在做什么:
    $ celery -A proj events --dump
    或者启动curses界面:
    $ celery -A proj events
    完成监视后,可以再次禁用事件:
    $ celery -A proj control disable_events
     
    7.Link回调
    @app.task
    def error_handler(uuid):
    result = AsyncResult(uuid)
    exc = result.get(propagate=False)
    print('Task {0}
    raised exception: {1!r} {2!r}'.format( uuid, exc, result.traceback))
    可以使用link_error执行选项将其添加到任务中:
    add.apply_async((2, 2), link_error=error_handler.s())
    此外,link和link_error选项都可以表示为列表:
    add.apply_async((2, 2), link=[add.s(16), other_task.s()])
    然后将依次调用回调返回,使用父任务的返回值作为部分参数来调用所有回调。
     
    可以将任意多个任务链接在一起,并且签名也可以链接:
    s = add.s(2, 2) s.link(mul.s(4))
    s.link(log_result.s())
     
    8.on_message
    Celery通过on_message回调来捕获所有状态的变化。
    例如,对于长时间运行的任务以发送任务进度,您可以执行以下操作:
    @app.task(bind=True)
    def hello(self, a, b):
        time.sleep(1)
        self.update_state(state="PROGRESS", meta={'progress': 50})
        time.sleep(1)
        self.update_state(state="PROGRESS", meta={'progress': 90})
        time.sleep(1)
        return 'hello world: %i' % (a+b)
    
    def on_raw_message(body):
        print(body)
    
    a, b = 1, 1
    r = hello.apply_async(args=(a, b))
    print(r.get(on_message=on_raw_message, propagate=False))

    将生成如下输出:

    {'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
     'result': {'progress': 50},
     'children': [],
     'status': 'PROGRESS',
     'traceback': None}
    {'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
     'result': {'progress': 90},
     'children': [],
     'status': 'PROGRESS',
     'traceback': None}
    {'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
     'result': 'hello world: 10',
     'children': [],
     'status': 'SUCCESS',
     'traceback': None}
    hello world: 10
     
    9.eta和countdown
    通过ETA(预计到达时间),可以设置特定的日期和时间,即最早执行任务的时间。
    而countdown(倒计时)是一种以秒为单位设置ETA的快捷方式。
    result = add.apply_async((2, 2), countdown=3)
    result.get() # this takes at least 3 seconds to return
     
    确保任务在指定的日期和时间之后的某个时间执行,但不一定在该确切时间执行。截止期限过长的可能原因可能包括许多项目在队列中等待,或者严重的网络延迟。
    虽然countdown是整数,但eta必须是一个datetime 对象,并指定确切的日期和时间(包括毫秒精度和时区信息):
    from datetime import datetime, timedelta
    tomorrow = datetime.utcnow() + timedelta(days=1)
    add.apply_async((2, 2), eta=tomorrow)
     
    10.expires
    add.apply_async((10, 10), expires=60)    # Task expires after one minute
    from now. from datetime import datetime, timedelta
    add.apply_async((10, 10), kwargs, expires=datetime.now() + timedelta(days=1)
    # Also supports datetime
    当工人收到过期的任务时,它将任务标记为REVOKED
     
     

  • 相关阅读:
    springcloud2.0 添加配置中心遇到的坑
    redis 字符串和集合操作
    Linux
    流畅的python 符合python风格的对象
    流畅的python 对象引用 可变性和垃圾回收
    流畅的python 闭包
    流畅的python 使用一等函数实现设计模式
    互联网协议入门
    流畅的python 字典和集合
    流畅的python python数据模型
  • 原文地址:https://www.cnblogs.com/absoluteli/p/14016955.html
Copyright © 2011-2022 走看看