0、Celery路由方式
1、自动路由:定义好规则,调用任务的时候,无需指定路由,调用任务的函数delay【celery启动的时候,需要指定队列名字,才可以进行正常的运行】 2、手动路由:调用方法的时候,指定路由和队列或路由的key,调用任务函数apply_async 【celery启动的时候,无需指定队列名字,随worker一起启动】
演示代码下载地址
https://github.com/ygbh/celery_django_project
1、自动路由
1.1、自动路由的逻辑概念
自动路由,即需要提前定义好路由规则,直接使用delay或apply_async方法来调用任务
1.2、项目演示代码准备
创建2个app,不同app路由到不同队列和交换接口进行任务的处理
这里演示的app为:
app01:主要做运算处理,例如求和
app_log:主要做log的处理
1.3、app01的应用views.py 代码:
def async_route_add_task(request): """ 使用调用apply_async,相乘的任务,主要演示路由分发 :param request: :return: """ arg1 = 1 arg2 = 2 result = add.apply_async(args=(arg1, arg2,), queue='app01', priority=0, exchange='app01') task_status = AsyncResult(result.task_id, app=result.app) return JsonResponse({'input_args': [arg1, arg2], 'task_id': result.task_id, 'result': task_status.get()}) def sync_route_add_task(request): """ 使用delay,调用相乘的任务,主要演示路由分发 :param request: :return: """ arg1 = 1 arg2 = 2 result = add.delay(arg1, arg2) task_status = AsyncResult(result.task_id, app=result.app) return JsonResponse({'input_args': [arg1, arg2], 'task_id': result.task_id, 'result': task_status.get()})
1.4、app_log应用 views.py 代码
def async_route_handler_log(request): log_content = 'async_route_handler_log' result = handler_log.apply_async(args=(log_content,), queue='app_log', priority=0, exchange='app_log') task_status = AsyncResult(result.task_id, app=result.app) return JsonResponse({'input_args': [log_content], 'task_id': result.task_id, 'result': task_status.get()}) def sync_route_handler_log(request): log_content = 'async_route_handler_log' result = handler_log.delay(log_content) task_status = AsyncResult(result.task_id, app=result.app) return JsonResponse({'input_args': [log_content], 'task_id': result.task_id, 'result': task_status.get()})
1.5、配置Django url路由
urlpatterns = [
path('async_add/', views.async_add_task), # 主演示指定exchange+queue+routing key,来任务调用,函数传普通参数
path('sync_add/', views.sync_add_task),# 主演示,直接任务调用,函数传普通参数
path('async_xsum/', views.async_xsum_task),
path('sync_xsum/', views.sync_xsum_task),
path('app01/async_add/', views.async_route_add_task), # app01,主要演示手动路由
path('app01/sync_add/', views.sync_route_add_task), # app01,主要演示自动路由
path('app_log/async_log/', log_views.async_route_handler_log), # app_log,主要演示手动路由
path('app_log/sync_log/', log_views.sync_route_handler_log), # app_log,主要演示自动路由
]
1.6、在celery_settings.py 配置路由规则
# 定义自动路由规则,主要是给delay或apply_async函数,调用任务的时候使用,【元组方式定义】,顺序匹配执行 # 参考官方文档:https://docs.celeryproject.org/en/stable/userguide/routing.html#exchange-types # CELERY_TASK_ROUTES = ([ # ('app01.tasks.*', {"queue": "app01"}), # ('app_log.tasks.*', {"queue": "app_log"}), # ],) # 定义自动路由规则,主要是给delay或apply_async函数,调用任务的时候使用,【字典方式定义】
CELERY_TASK_ROUTES = {
'app01.tasks.*': {"queue": "app01"},
'app_log.tasks.*': {"queue": "app_log"}
}
1.7、指定队列启动celery worker服务
# 启动只接收队列app01的数据处理任务服务 celery -A django_celery_project worker -Q app01 -p eventlet -n app01 -l info # 启动只接收队列app_log的数据处理任务服务 celery -A django_celery_project worker -P eventlet -l info -n app_log -Q app_log # 启动只接收队列celery【即是默认队列】的数据处理任务服务 celery -A django_celery_project worker -P eventlet -l info -n celery
1.8、测试效果
1.8.1、app01,接收任务和处理任务正常
1.8.2、app_log,接收任务和处理任务正常
1.8.3、默认的celery,接收任务和处理任务正常
2、手动路由
2.1、自动路由的逻辑概念
主要是调度路由的时候,要指定队列,交换接口,路由的键[key],由rabbitMQ自动路由到指定的队列,
该监听函数的队列,监听到有数据的时候,即接收任务处理。
2.2、项目演示代码准备
创建2个应用,不同应用根据指定队列,交换接口,路由的键[key],选择路由任务的调度处理
这里演示的app为:
app01:主要做运算处理,例如:求和
app_log:主要做log的处理
2.3、app01的应用views.py 代码:
def async_route_add_task(request): """ 使用调用apply_async,相乘的任务,主要演示路由分发 :param request: :return: """ arg1 = 1 arg2 = 2 result = add.apply_async(args=(arg1, arg2,), queue='feed_tasks', routing_key='task.add', priority=0, exchange='default', ) task_status = AsyncResult(result.task_id, app=result.app) return JsonResponse({'input_args': [arg1, arg2], 'task_id': result.task_id, 'result': task_status.get()})
2.4、app_log应用 views.py 代码
def async_route_handler_log(request): log_content = 'async_route_handler_log' result = handler_log.apply_async(args=(log_content,)) task_status = AsyncResult(result.task_id, app=result.app) return JsonResponse({'input_args': [log_content], 'task_id': result.task_id, 'result': task_status.get()})
2.5、配置Django url路由
urlpatterns = [ path('async_add/', views.async_add_task), # 主演示指定exchange+queue+routing key,来任务调用,函数传普通参数 path('sync_add/', views.sync_add_task), # 主演示,直接任务调用,函数传普通参数 path('sync_add/', views.sync_add_task), # 主演示,直接任务调用,函数传普通参数 path('async_xsum/', views.async_xsum_task), # 主演示,直接任务调用,函数传列表参数 path('sync_xsum/', views.sync_xsum_task), # 主演示,直接任务调用,函数传列表参数 path('app01/async_add/', views.async_route_add_task), # app01,主要演示手动路由 path('app01/sync_add/', views.sync_route_add_task), # app01,主要演示自动路由 path('app_log/async_log/', log_views.async_route_handler_log), # app_log,主要演示手动路由 path('app_log/sync_log/', log_views.sync_route_handler_log), # app_log,主要演示自动路由 ]
2.6、在celery_settings.py 配置路由规则[注意:这里需要把自动路由的信息注释或删除]
# 定义队列规则,主要是给apply_async函数,调用任务的时候使用 CELERY_TASK_QUEUES = { Queue("celery", Exchange("celery"), routing_key="celery.default"), # 默认队列 Queue("feed_tasks", Exchange(name="default", type='topic'), routing_key="task.#"), # 定义队列feed_tasks,从交换接口:default接收,并且过滤路由的key,主要演示手动路由的机制 Queue("add_queue", Exchange("compute_node"), routing_key="add_task"), # 定义队列:add_queue,绑定交换机:compute_node Queue("mul_queue", Exchange("compute_node"), routing_key="mul_task"), # 定义队列:mul_queue,绑定交换机:compute_node Queue("xsum_queue", Exchange("compute_node"), routing_key="xsum_task") # 定义队列:xsum_queue,绑定交换机:compute_node }
2.7、指定队列启动celery worker服务
# 启动worker 服务 celery -A django_celery_project worker -P eventlet -l info -n celery
2.8、测试效果
2.9、手动路由配置完成
3、路由的优先级选项【这里不在演示介绍,只是设置一下参数而且】
3.1、规则
优先级限制:0-9
数字越来,越优先消费
默认值是:0
3.2、配置的方法
3.2.1、在celery配置文件里面设置
可以通过设置x-max-priority
参数将队列配置为支持优先级 :
from kombu import Exchange, Queue app.conf.task_queues = [ Queue('tasks', Exchange('tasks'), routing_key='tasks', queue_arguments={'x-max-priority': 10}), ]
3.2.2、在调用任务函数的时候,进行设置优先级
task.apply_async(priority=0)
注意:
调用时设置,会覆盖默认的配置
4、Celery广播路由【因为比较少用,这里直接复制官方的设置方法,不做演示】
4.1、定义广播路由的配置
from kombu.common import Broadcast app.conf.task_queues = (Broadcast('broadcast_tasks'),) # 定义队列的名字 app.conf.task_routes = { 'tasks.reload_cache': { 'queue': 'broadcast_tasks', # 路由的队列名字 'exchange': 'broadcast_tasks' #交换的接口 } }
4.2、设置一个周期运行任务【周期任务一般在关闭运行结果,因为输出结果对业务没有什么作用:@shared_task(ignore_result=False)】
from kombu.common import Broadcast from celery.schedules import crontab app.conf.task_queues = (Broadcast('broadcast_tasks'),) app.conf.beat_schedule = { 'test-task': { 'task': 'tasks.reload_cache', 'schedule': crontab(minute=0, hour='*/3'), 'options': {'exchange': 'broadcast_tasks'} }, }
5、Celery的优化
默认:Celery对数据都是进行持久化【即:delivery_mode=0】,就算重启MQ,数据也不会丢失。
如果数据允许丢失的话,可以关闭持久化,从而达到提升性能【即:delivery_mode=1】
5.1、优化的方法
设置方法1、
from kombu import Exchange, Queue task_queues = ( Queue('celery', routing_key='celery'), Queue('transient', Exchange('transient', delivery_mode=1), routing_key='transient', durable=False), ) 设置方法2: task_routes = { 'proj.tasks.add': {'queue': 'celery', 'delivery_mode': 'transient'} } 设置方法3: ask.apply_async(args, queue='transient')