zoukankan      html  css  js  c++  java
  • celery 笔记

    参考:https://blog.csdn.net/tichimi3375/article/details/82415412

    中文翻译:https://www.celerycn.io/      https://blog.csdn.net/weixin_40475396/article/details/80439781

    官网:http://docs.celeryproject.org/en/latest/index.html

    注意事项:

    1 cd到tasks.py同级目录中,执行命令

     2 导入配置的一种方式

    3 my_task.apply_async((2, 2), queue='my_queue', countdown=10) 任务my_task将会被发送到my_queue队列中,并且在发送10秒之后执行。

     4 

     5

    group: 一组任务并行执行,返回一组返回值,并可以按顺序检索返回值。

      chain: 任务一个一个执行,一个执行完将执行return结果传递给下一个任务函数

    from proj.tasks import my_task1
    from proj.tasks import my_task2
    from proj.tasks import my_task3
    from celery import group
     
    # 将多个signature放入同一组中
    my_group = group((my_task1.s(10, 10), my_task2.s(20, 20), my_task3.s(30, 30)))
    ret = my_group() # 执行组任务
    print(ret.get())  # 输出每个任务结果
    from proj.tasks import my_task1
    from proj.tasks import my_task2
    from proj.tasks import my_task3
    from celery import chain
     
    # 将多个signature组成一个任务链
    # my_task1的运行结果将会传递给my_task2
    # my_task2的运行结果会传递给my_task3
    my_chain = chain(my_task1.s(10, 10) | my_task2.s(20) | my_task3.s(30))
    ret = my_chain()  # 执行任务链
    print(ret.get())  # 输出最终结果

    6 my_task1.apply_async(queue='queue1')通过apply_aynsc()方法来设置任务发送到那个队列中

    7 celery -A proj worker --loglevel=info -Q queue1,queue2 设置一个worker服务器处理两个队列中的任务

    8 celery beat是一个调度器,它可以周期内指定某个worker来执行某个任务。

    启动woker处理周期性任务: celery -A proj worker --loglevel=info --beat

    beat_schedule = {
        'every-5-minute':
            {
                'task': 'proj.tasks.period_task',
                'schedule': 5.0,
                'args': (16, 16),
            },
        'add-every-monday-morning': {
            'task': 'proj.tasks.period_task',
            'schedule': crontab(hour=7, minute=30, day_of_week=1),
            'args': (16, 16),
        },
     
    }

    9

    from celery import Celery
     
    # 创建celery实例
    app = Celery('demo')
    app.config_from_object('proj.celeryconfig')
     
    # 自动搜索任务
    app.autodiscover_tasks(['proj'])

     10 定义app时要指定名称,否则如下

    当这个模块运行,任务将以前缀 __main__ 命名,但是当该模块被其他进程引入来运行一个任务,这个任务的名称将以前缀 tasks 命名(即这个模块的真实名称)

    【app名称不指定,则task的名称不固定,就不方便根据任务名称映射出实际任务函数】

    from celery import Celery
    app = Celery()   --app未指定名称
    
    @app.task
    def add(x, y): return x + y
    
    if __name__ == '__main__':
        app.worker_main()

    最佳实践如下:

    >>> app = Celery('tasks')
    >>> app.main
    'tasks'
    
    >>> @app.task
    ... def add(x, y):
    ...     return x + y
    
    >>> add.name
    tasks.add

    参考:https://blog.csdn.net/libing_thinking/article/details/78541171

    11 客户端导入模块 myapp.tasks 时使用 .tasks ,而工作单元导入模块使用 myapp.tasks, 他们产生的名称会不匹配,任务调用时工作单元会报 NotRegistered 错误。

    >>> from project.myapp.tasks import mytask
    >>> mytask.name
    'project.myapp.tasks.mytask'
    
    >>> from myapp.tasks import mytask
    >>> mytask.name
    'myapp.tasks.mytask'

    基于这一点,你必须在导入模块时保持一致,这也是 python 的最佳实践

    参考:https://blog.csdn.net/libing_thinking/article/details/78547816

    12 

    celery_demo                    # 项目根目录
        ├── celery_app             # 存放 celery 相关文件
        │   ├── __init__.py
        │   ├── celeryconfig.py    # 配置文件
        │   ├── task1.py           # 任务文件 1
        │   └── task2.py           # 任务文件 2
        └── client.py              # 应用程序

    执行 python client.py 就生产出了task

    在celery_demo 目录下执行 celery worker -A celery_app -l info -Q email 就执行了任务

    13

    https://www.cnblogs.com/kangoroo/p/6588615.html

    from celery.app.task import Task
    
    class CallbackTask(Task):
    
        def __init__(self):
            super(CallbackTask, self).__init__()
    
    def on_success(self, retval, task_id, args, kwargs):
            try:
                item_param= json.loads(args[0])
                logger.info('[task_id] %s, [task_type] %s, finished successfully.' % (task_id, item_param.get('task_type')))
            except Exception, ex:
                logger.error(traceback.format_exc())
    
        def on_failure(self, exc, task_id, args, kwargs, einfo):
            try:
                item_param = json.loads(args[0])
                logger.error(('Task {0} raised exception: {1!r}
    {2!r}'.format(
                        task_id, exc, einfo.traceback)))
            except Exception, ex:
                logger.error(traceback.format_exc())
    from celery import task
    from common.callback import CallbackTask
    
    logger = logging.getLogger(__name__)
    
    @task(base=CallbackTask)
    def quota_check(item_param):
        logger.info('start')
        return

     14

    result = add.delay(44)

    >>> result.ready()
    False
    

    You can wait for the result to complete, but this is rarely used since it turns the asynchronous call into a synchronous one:

    >>> result.get(timeout=1)
    8
    

    In case the task raised an exception, get() will re-raise the exception, but you can override this by specifying the propagate argument:

    >>> result.get(propagate=False)
    

    If the task raised an exception you can also gain access to the original traceback:

    >>> result.traceback


    15 添加定时任务的一种方法
    from celery import Celery
    from celery.schedules import crontab
    
    app = Celery()
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # Calls test('hello') every 10 seconds.
        sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
    
        # Calls test('world') every 30 seconds
        sender.add_periodic_task(30.0, test.s('world'), expires=10)
    
        # Executes every Monday morning at 7:30 a.m.
        sender.add_periodic_task(
            crontab(hour=7, minute=30, day_of_week=1),
            test.s('Happy Mondays!'),
        )
    
    @app.task
    def test(arg):
        print(arg)
    另一种方法:
    Example: Run the tasks.add task every 30 seconds.
    
    app.conf.beat_schedule = {
        'add-every-30-seconds': {
            'task': 'tasks.add',
            'schedule': 30.0,
            'args': (16, 16)
        },
    }
    app.conf.timezone = 'UTC'

     16 查看结果:

    from celery.result import AsyncResult
    from celery_app_task import cel
    
    async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)
    
    if async.successful():
        result = async.get()
        print(result)
        # result.forget() # 将结果删除
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

    ==========================

    ret=task1.add.apply_async(args=[2, 8],queue="email",routing_key="email")
    print('hello world')
    print(ret.status)
    print(ret.id)

    print(ret.result) # NONE
    print(ret.get()) #没有加timeout,所以阻塞住了,直到返回结果10
    print(ret.result) # 10

    17

     
  • 相关阅读:
    免费证书Let’s Encrypt
    kubernetes中使用ServiceAccount创建kubectl config 文件
    kubectl alias auto complete
    kubernetes dashboard permission errors
    du 与df 统计系统磁盘不一致原因与解决方法
    大访问量、高并发网站优化
    React的Sass配置
    转:Zepto的使用以及注意事项
    转: zepto的使用方法
    Extjs4 修改combox中store的数据
  • 原文地址:https://www.cnblogs.com/testzcy/p/11649936.html
Copyright © 2011-2022 走看看