zoukankan      html  css  js  c++  java
  • celery 笔记

    参考:https://blog.csdn.net/tichimi3375/article/details/82415412

    中文翻译:https://www.celerycn.io/      https://blog.csdn.net/weixin_40475396/article/details/80439781

    官网:http://docs.celeryproject.org/en/latest/index.html

    注意事项:

    1 cd到tasks.py同级目录中,执行命令

     2 导入配置的一种方式

    3 my_task.apply_async((2, 2), queue='my_queue', countdown=10) 任务my_task将会被发送到my_queue队列中,并且在发送10秒之后执行。

     4 

     5

    group: 一组任务并行执行,返回一组返回值,并可以按顺序检索返回值。

      chain: 任务一个一个执行,一个执行完将执行return结果传递给下一个任务函数

    from proj.tasks import my_task1
    from proj.tasks import my_task2
    from proj.tasks import my_task3
    from celery import group
     
    # 将多个signature放入同一组中
    my_group = group((my_task1.s(10, 10), my_task2.s(20, 20), my_task3.s(30, 30)))
    ret = my_group() # 执行组任务
    print(ret.get())  # 输出每个任务结果
    from proj.tasks import my_task1
    from proj.tasks import my_task2
    from proj.tasks import my_task3
    from celery import chain
     
    # 将多个signature组成一个任务链
    # my_task1的运行结果将会传递给my_task2
    # my_task2的运行结果会传递给my_task3
    my_chain = chain(my_task1.s(10, 10) | my_task2.s(20) | my_task3.s(30))
    ret = my_chain()  # 执行任务链
    print(ret.get())  # 输出最终结果

    6 my_task1.apply_async(queue='queue1')通过apply_aynsc()方法来设置任务发送到那个队列中

    7 celery -A proj worker --loglevel=info -Q queue1,queue2 设置一个worker服务器处理两个队列中的任务

    8 celery beat是一个调度器,它可以周期内指定某个worker来执行某个任务。

    启动woker处理周期性任务: celery -A proj worker --loglevel=info --beat

    beat_schedule = {
        'every-5-minute':
            {
                'task': 'proj.tasks.period_task',
                'schedule': 5.0,
                'args': (16, 16),
            },
        'add-every-monday-morning': {
            'task': 'proj.tasks.period_task',
            'schedule': crontab(hour=7, minute=30, day_of_week=1),
            'args': (16, 16),
        },
     
    }

    9

    from celery import Celery
     
    # 创建celery实例
    app = Celery('demo')
    app.config_from_object('proj.celeryconfig')
     
    # 自动搜索任务
    app.autodiscover_tasks(['proj'])

     10 定义app时要指定名称,否则如下

    当这个模块运行,任务将以前缀 __main__ 命名,但是当该模块被其他进程引入来运行一个任务,这个任务的名称将以前缀 tasks 命名(即这个模块的真实名称)

    【app名称不指定,则task的名称不固定,就不方便根据任务名称映射出实际任务函数】

    from celery import Celery
    app = Celery()   --app未指定名称
    
    @app.task
    def add(x, y): return x + y
    
    if __name__ == '__main__':
        app.worker_main()

    最佳实践如下:

    >>> app = Celery('tasks')
    >>> app.main
    'tasks'
    
    >>> @app.task
    ... def add(x, y):
    ...     return x + y
    
    >>> add.name
    tasks.add

    参考:https://blog.csdn.net/libing_thinking/article/details/78541171

    11 客户端导入模块 myapp.tasks 时使用 .tasks ,而工作单元导入模块使用 myapp.tasks, 他们产生的名称会不匹配,任务调用时工作单元会报 NotRegistered 错误。

    >>> from project.myapp.tasks import mytask
    >>> mytask.name
    'project.myapp.tasks.mytask'
    
    >>> from myapp.tasks import mytask
    >>> mytask.name
    'myapp.tasks.mytask'

    基于这一点,你必须在导入模块时保持一致,这也是 python 的最佳实践

    参考:https://blog.csdn.net/libing_thinking/article/details/78547816

    12 

    celery_demo                    # 项目根目录
        ├── celery_app             # 存放 celery 相关文件
        │   ├── __init__.py
        │   ├── celeryconfig.py    # 配置文件
        │   ├── task1.py           # 任务文件 1
        │   └── task2.py           # 任务文件 2
        └── client.py              # 应用程序

    执行 python client.py 就生产出了task

    在celery_demo 目录下执行 celery worker -A celery_app -l info -Q email 就执行了任务

    13

    https://www.cnblogs.com/kangoroo/p/6588615.html

    from celery.app.task import Task
    
    class CallbackTask(Task):
    
        def __init__(self):
            super(CallbackTask, self).__init__()
    
    def on_success(self, retval, task_id, args, kwargs):
            try:
                item_param= json.loads(args[0])
                logger.info('[task_id] %s, [task_type] %s, finished successfully.' % (task_id, item_param.get('task_type')))
            except Exception, ex:
                logger.error(traceback.format_exc())
    
        def on_failure(self, exc, task_id, args, kwargs, einfo):
            try:
                item_param = json.loads(args[0])
                logger.error(('Task {0} raised exception: {1!r}
    {2!r}'.format(
                        task_id, exc, einfo.traceback)))
            except Exception, ex:
                logger.error(traceback.format_exc())
    from celery import task
    from common.callback import CallbackTask
    
    logger = logging.getLogger(__name__)
    
    @task(base=CallbackTask)
    def quota_check(item_param):
        logger.info('start')
        return

     14

    result = add.delay(44)

    >>> result.ready()
    False
    

    You can wait for the result to complete, but this is rarely used since it turns the asynchronous call into a synchronous one:

    >>> result.get(timeout=1)
    8
    

    In case the task raised an exception, get() will re-raise the exception, but you can override this by specifying the propagate argument:

    >>> result.get(propagate=False)
    

    If the task raised an exception you can also gain access to the original traceback:

    >>> result.traceback


    15 添加定时任务的一种方法
    from celery import Celery
    from celery.schedules import crontab
    
    app = Celery()
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # Calls test('hello') every 10 seconds.
        sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
    
        # Calls test('world') every 30 seconds
        sender.add_periodic_task(30.0, test.s('world'), expires=10)
    
        # Executes every Monday morning at 7:30 a.m.
        sender.add_periodic_task(
            crontab(hour=7, minute=30, day_of_week=1),
            test.s('Happy Mondays!'),
        )
    
    @app.task
    def test(arg):
        print(arg)
    另一种方法:
    Example: Run the tasks.add task every 30 seconds.
    
    app.conf.beat_schedule = {
        'add-every-30-seconds': {
            'task': 'tasks.add',
            'schedule': 30.0,
            'args': (16, 16)
        },
    }
    app.conf.timezone = 'UTC'

     16 查看结果:

    from celery.result import AsyncResult
    from celery_app_task import cel
    
    async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)
    
    if async.successful():
        result = async.get()
        print(result)
        # result.forget() # 将结果删除
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

    ==========================

    ret=task1.add.apply_async(args=[2, 8],queue="email",routing_key="email")
    print('hello world')
    print(ret.status)
    print(ret.id)

    print(ret.result) # NONE
    print(ret.get()) #没有加timeout,所以阻塞住了,直到返回结果10
    print(ret.result) # 10

    17

     
  • 相关阅读:
    jquery实现选项卡(两句即可实现)
    常用特效积累
    jquery学习笔记
    idong常用js总结
    织梦添加幻灯片的方法
    LeetCode "Copy List with Random Pointer"
    LeetCode "Remove Nth Node From End of List"
    LeetCode "Sqrt(x)"
    LeetCode "Construct Binary Tree from Inorder and Postorder Traversal"
    LeetCode "Construct Binary Tree from Preorder and Inorder Traversal"
  • 原文地址:https://www.cnblogs.com/testzcy/p/11649936.html
Copyright © 2011-2022 走看看