zoukankan      html  css  js  c++  java
  • Celery--进阶用法

    重点
    首先来看之前的task:
    @app.task  #普通函数装饰为 celery task
    def add(x, y):
        return x + y
    这里的装饰器app.task实际上是将一个正常的函数修饰成了一个 celery task 对象,所以这里我们可以给修饰器加上参数来决定修饰后的 task 对象的一些属性。
    首先,我们可以让被修饰的函数成为 task 对象的绑定方法,这样就相当于被修饰的函数 add 成了 task 的实例方法,可以调用 self 获取当前 task 实例的很多状态及属性。
    其次,我们也可以自己复写 task 类,然后让这个自定义 task 修饰函数 add ,来做一些自定义操作。
     
    一.根据任务状态执行不同操作
    任务执行后,根据任务状态执行不同操作需要我们复写 task 的 on_failure、on_success 等方法
    tasks.py:
    from celery_app import app
    
    class MyTask(Task):
          def on_success(self, retval, task_id, args, kwargs):
                print 'task done: {0}'.format(retval)
                return super(MyTask, self).on_success(retval, task_id, args, kwargs)
        
          def on_failure(self, exc, task_id, args, kwargs, einfo):
                print 'task fail, reason: {0}'.format(exc)
                return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)
    
    @app.task(base=MyTask)
    def add(x, y):
          raise KeyError
          return x + y
    重新运行 worker:
    celery -A tasks worker --loglevel=info
    可以看到,任务执行成功或失败后分别执行了我们自定义的 on_failure、on_success
     
    二.绑定任务为实例方法
    tasks.py:
    from celery.utils.log import get_task_logger
    
    logger = get_task_logger(__name__)
    @app.task(bind=True)
    def add(self, x, y):
          logger.info(self.request.__dict__)
          return x + y
    执行中的任务获取到了自己执行任务的各种信息,可以根据这些信息做很多其他操作,例如判断链式任务是否到结尾等等。
    关于 celery.task.request 对象的详细数据可以看这里
     
    三.任务状态回调
    实际场景中得知任务状态是很常见的需求,对于 Celery 其内建任务状态有如下几种:
    参数
    说明
    PENDING
    任务等待中
    STARTED
    任务已开始
    SUCCESS
    任务执行成功
    FAILURE
    任务执行失败
    RETRY
    任务将被重试
    REVOKED
    任务取消
    当我们有个耗时时间较长的任务进行时一般我们想得知它的实时进度,这里就需要我们自定义一个任务状态用来说明进度并手动更新状态,从而告诉回调当前任务的进度,具体实现:
    tasks.py:
    from celery import Celery
    import time
    
    @app.task(bind=True)
    def test_mes(self):
        for i in xrange(1, 11):
            time.sleep(0.1)
            self.update_state(state="PROGRESS", meta={'p': i*10})
        return 'finish'
    然后在 trigger.py 中增加:
    from task import add,test_mes
    import sys
    
    def pm(body):
        res = body.get('result')
        if body.get('status') == 'PROGRESS':
            sys.stdout.write('
    任务进度: {0}%'.format(res.get('p')))
            sys.stdout.flush()
        else:
            print '
    '
            print res
    r = test_mes.delay()
    print r.get(on_message=pm, propagate=False)
     
    四.链式任务
    有些任务可能需由几个子任务组成,尽量不要以同步阻塞的方式调用子任务,而是用异步回调的方式进行链式任务的调用:
    正确示范1
    def update_page_info(url):
        # fetch_page -> parse_page -> store_page
        chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
        chain()
    
    @app.task()
    def fetch_page(url):
        return myhttplib.get(url)
    
    @app.task()
    def parse_page(page):
        return myparser.parse_document(page)
    
    @app.task(ignore_result=True)
    def store_page_info(info, url):
        PageInfo.objects.create(url=url, info=info)
    正确示范2
    fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)])

    链式任务中前一个任务的返回值默认是下一个任务的输入值之一 ( 不想让返回值做默认参数可以用 si() 或者 s(immutable=True) 的方式调用 )。

    这里的 s() 是方法 celery.signature() 的快捷调用方式,signature 具体作用就是生成一个包含调用任务及其调用参数与其他信息的对象,个人感觉有点类似偏函数的概念:先不执行任务,而是把任务与任务参数存起来以供其他地方调用。
     

  • 相关阅读:
    十二月十学习报告
    十二月八学习报告
    十二月七学习报告
    学习笔记187—在线会议共享PPT时,设置让观众看不到备注,而自己能看到【腾讯会议】
    学习笔记186—打印机可以打印测试页,但是通过WPS或Word无法打印文档?
    《程序员的自我修养》读书有感 其一
    Linux下的单向ping通问题
    做一个Pandas专家,教你如何用它高效处理大量数据
    grpc python 源码分析(1):server 的创建和启动
    grpc python 源码分析(2):server 处理请求
  • 原文地址:https://www.cnblogs.com/absoluteli/p/14016774.html
Copyright © 2011-2022 走看看