zoukankan      html  css  js  c++  java
  • 四、Celery 路由学习篇

    0、Celery路由方式

    1、自动路由:定义好规则,调用任务的时候,无需指定路由,调用任务的函数delay【celery启动的时候,需要指定队列名字,才可以进行正常的运行】
    2、手动路由:调用方法的时候,指定路由和队列或路由的key,调用任务函数apply_async 【celery启动的时候,无需指定队列名字,随worker一起启动】

    演示代码下载地址

    https://github.com/ygbh/celery_django_project

    1、自动路由

    1.1、自动路由的逻辑概念

    自动路由,即需要提前定义好路由规则,直接使用delay或apply_async方法来调用任务

    1.2、项目演示代码准备

    创建2个app,不同app路由到不同队列和交换接口进行任务的处理
    这里演示的app为:
    app01:主要做运算处理,例如求和
    app_log:主要做log的处理

    1.3、app01的应用views.py 代码:

    def async_route_add_task(request):
        """
          使用调用apply_async,相乘的任务,主要演示路由分发
        :param request:
        :return:
        """
        arg1 = 1
        arg2 = 2
        result = add.apply_async(args=(arg1, arg2,),
                                 queue='app01',
                                 priority=0,
                                 exchange='app01')
        task_status = AsyncResult(result.task_id, app=result.app)
        return JsonResponse({'input_args': [arg1, arg2], 'task_id': result.task_id, 'result': task_status.get()})
    
    
    def sync_route_add_task(request):
        """
            使用delay,调用相乘的任务,主要演示路由分发
        :param request:
        :return:
        """
        arg1 = 1
        arg2 = 2
        result = add.delay(arg1, arg2)
        task_status = AsyncResult(result.task_id, app=result.app)
        return JsonResponse({'input_args': [arg1, arg2], 'task_id': result.task_id, 'result': task_status.get()})

    1.4、app_log应用 views.py 代码 

    def async_route_handler_log(request):
        log_content = 'async_route_handler_log'
        result = handler_log.apply_async(args=(log_content,),
                                         queue='app_log',
                                         priority=0,
                                         exchange='app_log')
        task_status = AsyncResult(result.task_id, app=result.app)
        return JsonResponse({'input_args': [log_content], 'task_id': result.task_id, 'result': task_status.get()})
    
    
    def sync_route_handler_log(request):
        log_content = 'async_route_handler_log'
        result = handler_log.delay(log_content)
        task_status = AsyncResult(result.task_id, app=result.app)
        return JsonResponse({'input_args': [log_content], 'task_id': result.task_id, 'result': task_status.get()})

    1.5、配置Django url路由

    urlpatterns = [
    path('async_add/', views.async_add_task), # 主演示指定exchange+queue+routing key,来任务调用,函数传普通参数
    path('sync_add/', views.sync_add_task),# 主演示,直接任务调用,函数传普通参数
    path('async_xsum/', views.async_xsum_task),
    path('sync_xsum/', views.sync_xsum_task),
    path('app01/async_add/', views.async_route_add_task), # app01,主要演示手动路由
    path('app01/sync_add/', views.sync_route_add_task), # app01,主要演示自动路由
    path('app_log/async_log/', log_views.async_route_handler_log), # app_log,主要演示手动路由
    path('app_log/sync_log/', log_views.sync_route_handler_log), # app_log,主要演示自动路由
    ]

    1.6、在celery_settings.py 配置路由规则

    # 定义自动路由规则,主要是给delay或apply_async函数,调用任务的时候使用,【元组方式定义】,顺序匹配执行
    # 参考官方文档:https://docs.celeryproject.org/en/stable/userguide/routing.html#exchange-types
    # CELERY_TASK_ROUTES = ([
    #                           ('app01.tasks.*', {"queue": "app01"}),
    #                           ('app_log.tasks.*', {"queue": "app_log"}),
    #                       ],)
    
    # 定义自动路由规则,主要是给delay或apply_async函数,调用任务的时候使用,【字典方式定义】 
    CELERY_TASK_ROUTES = { 
    'app01.tasks.*': {"queue": "app01"},
    'app_log.tasks.*': {"queue": "app_log"}
    }

    1.7、指定队列启动celery worker服务

    # 启动只接收队列app01的数据处理任务服务
    celery -A django_celery_project worker -Q app01 -p eventlet -n app01 -l info
    
    # 启动只接收队列app_log的数据处理任务服务
    celery -A django_celery_project worker -P eventlet -l info -n app_log -Q app_log
    
    # 启动只接收队列celery【即是默认队列】的数据处理任务服务
    celery -A django_celery_project worker -P eventlet -l info -n celery

    1.8、测试效果

    1.8.1、app01,接收任务和处理任务正常

     

    1.8.2、app_log,接收任务和处理任务正常

     

    1.8.3、默认的celery,接收任务和处理任务正常

     

    2、手动路由

    2.1、自动路由的逻辑概念

    主要是调度路由的时候,要指定队列,交换接口,路由的键[key],由rabbitMQ自动路由到指定的队列,
    该监听函数的队列,监听到有数据的时候,即接收任务处理。

    2.2、项目演示代码准备

    创建2个应用,不同应用根据指定队列,交换接口,路由的键[key],选择路由任务的调度处理
    
    这里演示的app为:
    app01:主要做运算处理,例如:求和 
    app_log:主要做log的处理

    2.3、app01的应用views.py 代码:

    def async_route_add_task(request):
        """
          使用调用apply_async,相乘的任务,主要演示路由分发
        :param request:
        :return:
        """
        arg1 = 1
        arg2 = 2
        result = add.apply_async(args=(arg1, arg2,),
                                 queue='feed_tasks',
                                 routing_key='task.add',
                                 priority=0,
                                 exchange='default', )
        task_status = AsyncResult(result.task_id, app=result.app)
        return JsonResponse({'input_args': [arg1, arg2], 'task_id': result.task_id, 'result': task_status.get()})

    2.4、app_log应用 views.py 代码 

    def async_route_handler_log(request):
        log_content = 'async_route_handler_log'
        result = handler_log.apply_async(args=(log_content,))
        task_status = AsyncResult(result.task_id, app=result.app)
        return JsonResponse({'input_args': [log_content], 'task_id': result.task_id, 'result': task_status.get()})

    2.5、配置Django url路由

    urlpatterns = [
        path('async_add/', views.async_add_task),  # 主演示指定exchange+queue+routing key,来任务调用,函数传普通参数
        path('sync_add/', views.sync_add_task),  # 主演示,直接任务调用,函数传普通参数
        path('sync_add/', views.sync_add_task),  # 主演示,直接任务调用,函数传普通参数
        path('async_xsum/', views.async_xsum_task),  # 主演示,直接任务调用,函数传列表参数
        path('sync_xsum/', views.sync_xsum_task),  # 主演示,直接任务调用,函数传列表参数
        path('app01/async_add/', views.async_route_add_task),  # app01,主要演示手动路由
        path('app01/sync_add/', views.sync_route_add_task),  # app01,主要演示自动路由
        path('app_log/async_log/', log_views.async_route_handler_log),  # app_log,主要演示手动路由
        path('app_log/sync_log/', log_views.sync_route_handler_log),  # app_log,主要演示自动路由
    ]

     2.6、在celery_settings.py 配置路由规则[注意:这里需要把自动路由的信息注释或删除]

    # 定义队列规则,主要是给apply_async函数,调用任务的时候使用
    CELERY_TASK_QUEUES = {
        Queue("celery", Exchange("celery"), routing_key="celery.default"),  # 默认队列
        Queue("feed_tasks", Exchange(name="default", type='topic'), routing_key="task.#"),
        # 定义队列feed_tasks,从交换接口:default接收,并且过滤路由的key,主要演示手动路由的机制
        Queue("add_queue", Exchange("compute_node"), routing_key="add_task"),  # 定义队列:add_queue,绑定交换机:compute_node
        Queue("mul_queue", Exchange("compute_node"), routing_key="mul_task"),  # 定义队列:mul_queue,绑定交换机:compute_node
        Queue("xsum_queue", Exchange("compute_node"), routing_key="xsum_task")  # 定义队列:xsum_queue,绑定交换机:compute_node
    }

     2.7、指定队列启动celery worker服务

    # 启动worker 服务
    celery -A django_celery_project worker -P eventlet -l info -n celery

     2.8、测试效果

     2.9、手动路由配置完成

    3、路由的优先级选项【这里不在演示介绍,只是设置一下参数而且】

    3.1、规则

    优先级限制:0-9
    数字越来,越优先消费
    默认值是:0

    3.2、配置的方法

    3.2.1、在celery配置文件里面设置

    可以通过设置x-max-priority参数将队列配置为支持优先级 

    from kombu import Exchange, Queue
    
    app.conf.task_queues = [
        Queue('tasks', Exchange('tasks'), routing_key='tasks',
              queue_arguments={'x-max-priority': 10}),
    ]

    3.2.2、在调用任务函数的时候,进行设置优先级

    task.apply_async(priority=0)
    
    注意:
        调用时设置,会覆盖默认的配置

    4、Celery广播路由【因为比较少用,这里直接复制官方的设置方法,不做演示】

    4.1、定义广播路由的配置

    from kombu.common import Broadcast
    
    app.conf.task_queues = (Broadcast('broadcast_tasks'),) # 定义队列的名字
    app.conf.task_routes = {
        'tasks.reload_cache': {
            'queue': 'broadcast_tasks', # 路由的队列名字
            'exchange': 'broadcast_tasks' #交换的接口
        }
    }

    4.2、设置一个周期运行任务【周期任务一般在关闭运行结果,因为输出结果对业务没有什么作用:@shared_task(ignore_result=False)】

    from kombu.common import Broadcast
    from celery.schedules import crontab
    
    app.conf.task_queues = (Broadcast('broadcast_tasks'),)
    
    app.conf.beat_schedule = {
        'test-task': {
            'task': 'tasks.reload_cache',
            'schedule': crontab(minute=0, hour='*/3'),
            'options': {'exchange': 'broadcast_tasks'}
        },
    }

    5、Celery的优化

    默认:Celery对数据都是进行持久化【即:delivery_mode=0】,就算重启MQ,数据也不会丢失。
    如果数据允许丢失的话,可以关闭持久化,从而达到提升性能【即:delivery_mode=1】

    5.1、优化的方法

    设置方法1、
    from
    kombu import Exchange, Queue task_queues = ( Queue('celery', routing_key='celery'), Queue('transient', Exchange('transient', delivery_mode=1), routing_key='transient', durable=False), ) 设置方法2: task_routes = { 'proj.tasks.add': {'queue': 'celery', 'delivery_mode': 'transient'} } 设置方法3: ask.apply_async(args, queue='transient')
  • 相关阅读:
    cocos2d 中判断CGPoint或者CGSize是否相等
    object-c cocos2d-x 写程序时注意调试的技巧
    object-c [self class] 和 [self _cmd]
    object-c 协议和委托
    ios cocos2d TexturePacker生成文件后的使用方法
    object-c 要理解协议的几个重要概念
    Linux Mint 没有 language support 语言支持解决方案
    cocos2d ARCH_OPTIMAL_PARTICLE_SYSTEM这个未定义的问题
    CCSpriteBatchNode的优化性能
    Codeforces Round #190 DIV.2 A. Ciel and Dancing
  • 原文地址:https://www.cnblogs.com/ygbh/p/13651887.html
Copyright © 2011-2022 走看看