zoukankan      html  css  js  c++  java
  • python celery 多work多队列

    1.Celery模块调用

    既然celery是一个分布式的任务调度模块,那么celery是如何和分布式挂钩呢,celery可以支持多台不通的计算机执行不同的任务或者相同的任务。

    如果要说celery的分布式应用的话,就要提到celery的消息路由机制,AMQP协议。具体的可以查看AMQP的文档。简单地说就是可以有多个消息队列(Message Queue),不同的消息可以指定发送给不同的Message Queue,而这是通过Exchange来实现的。发送消息到Message Queue中时,可以指定routiing_key,Exchange通过routing_key来把消息路由(routes)到不同的Message Queue中去。

    多worker,多队列,实例:

    1.在服务器上编写文件tasks.py。首先定义一个Celery的对象,然后通过celeryconfig.py对celery对象进行设置。之后又分别定义了三个task,分别是taskA, taskB和add。

    #!/usr/bin/env
    #-*-conding:utf-8-*-
    from celery import Celery,platforms
    platforms.C_FORCE_ROOT = True
    
    app = Celery()
    app.config_from_object("celeryconfig")
    
    @app.task
    def tashA(x,y):
    	return x*y
    
    @app.task
    def taskB(x,y,z):
    	return x+y+z
    
    @app.task
    def add(x,y):
    	return x+y

    2.编写celeryconfig.py文件。

    #!/usr/bin/env python
    #-*- coding:utf-8 -*-
    from kombu import Exchange,Queue
    from celery import platforms
    platforms.C_FORCE_ROOT = True
    
    BROKER_URL = "redis://localhost:6379/7" 
    CELERY_RESULT_BACKEND = "redis://localhost:6379/8"
    
    CELERY_QUEUES = (
    Queue("default",Exchange("default"),routing_key="default"),
    Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
    Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B") 
    )
    
    CELERY_ROUTES = {
    'tasks.taskA':{"queue":"for_task_A","routing_key":"for_task_A"},
    'tasks.taskB':{"queue":"for_task_B","routing_key":"for_task_B"}
    }

    3.启动worker来指定task

    celery -A tasks worker -l info -n workerA.%h -Q for_task_A

    celery -A tasks worker -l info -n workerB.%h -Q for_task_B

    4.传入参数

    将上面两个文件导出到pycharm中:

    编写文件传参:

    from tasks import *
    re1 = taskA.delay(100, 200)
    re2 = taskB.delay(1,2, 3)print(re3.status)          #查看re3的状态
    print(re3.id)               #查看re3的id
    

    运行之后可见:taskA,taskB都已正常执行。

    5.我们可以看到add(re3)的状态是PENDING,表示没有执行,这个是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。下面,我们来启动一个worker来执行celery队列中的任务。

    celery -A tasks worker -l info -n worker.%h -Q celery 

    这样我们再次运行pycharm就可以看见add也被运行了,并且redis数据库中也有该id了。

    2.Celery与定时任务

    1.在celery中执行定时任务非常简单,只需要设置celery对象中的CELERYBEAT_SCHEDULE属性即可。
    下面我们接着在celeryconfig.py中添加CELERYBEAT_SCHEDULE变量:

    CELERY_TIMEZONE = 'UTC'
    CELERYBEAT_SCHEDULE = {
        'taskA_schedule' : {
            'task':'tasks.taskA',
            'schedule':20,
            'args':(5,6)
        },
        'taskB_scheduler' : {
            'task':"tasks.taskB",
            "schedule":200,
            "args":(10,20,30)
        },
        'add_schedule': {
            "task":"tasks.add",
            "schedule":10,
            "args":(1,2)
        }
    }

    2.Celery启动定时任务

    celery -A tasks worker -l info -n workerA.%h -Q for_task_A -B

    启动完成后:

    taskA每20秒执行一次taskA.delay(5, 6)

    taskB每200秒执行一次taskB.delay(10, 20, 30)

    Celery每10秒执行一次add.delay(1, 2)

  • 相关阅读:
    What is EJB
    Redis连接工具类
    MyBatis单列工厂的实现
    TCP和UDP的区别(Socket)
    webSocket协议与Socket的区别
    OSI七层模型
    Http协议和HTTPS协议
    Cookie和Session
    Request库使用response.text返回乱码问题
    Selenium元素定位问题
  • 原文地址:https://www.cnblogs.com/zknublx/p/9149891.html
Copyright © 2011-2022 走看看