zoukankan      html  css  js  c++  java
  • day44-Celery异步分布式

    celery异步分布式
    Celery是一个python开发的异步分布式任务调度模块。
    Celery本身并不提供消息服务,使用第三方服务,也就是borker来传递任务,目前支持rebbimq,redis, 数据库等。

    这里我们使用redis
    连接url的格式为:
    redis://:password@hostname:port/db_number
    例如:
    BROKER_URL = 'redis://localhost:6379/0'

    安装celery
    pip install celery
    pip install redis

    在服务器上安装redis服务器,并启动redis
    第一个简单的例子:

    [root@localhost celery]# cat test.py
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
     
    from celery import Celery

    broker = "redis://10.37.208.40:6379/5"
    backend = "redis://10.37.208.40:6379/6"
    app = Celery("test",broker=broker,backend=backend)

    @app.task
    def add(x,y):
    return x+y
     

    启动worker
    #celery -A ling worker -l info


    生产者

    启动worker
    #celery -A test worker -l info
    
    
    生产者
    
    form test import add
    a = add.delay(10, 20)
    print(a.result)    #获取结果
    print(a.ready)        #是否处理
    print(a.get(timeout=1))        #获取结果
    print(a.status)        #是否处理

    celery模块调用
    既然celery 是一个分布式的任务调度模块,那么celery是如何和分布式挂钩呢,
    celery可以支持多台不通的计算机执行不通的任务或相同的任务
    如果要说celery的分布式应用的话,我认为要提到celery的消息路由机制,就要提一下AMQP协议,
    具体的可以查看AMQP的文档,简单地说就是可以有多个消息队列(Message Queue). 不同的消息可以指定发送给不同的Message Queue
    而这是通过Exchange来实现。发送消息到Message Queue中时,可以指定routing key, Exchange通过routing key来把消息路由(routes)到不通的Message Queue中

    实例:

    多worker,多队列
     cat /usr/local/src/celery/demon3.py
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    # @time: 2018/1/7 14:03
    # Author: caicai
    # @File: demon3.py
    
    from celery import  Celery
    
    app=Celery()
    app.config_from_object("celeryconfig")
    
    @app.task
    def taskA(x,y):
        return x*y
    
    @app.task
    def taskB(x,y,z):
        return x+y+z
    @app.task
    def add(x,y):
        return x+y
    
    配置文件一般单独写在一个文件中。
    
    配置文件:
    
    cat /usr/local/src/celery/celeryconfig.py
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    # @time: 2018/1/7 14:06
    # Author: caicai
    # @File: celeryconfig.py
    from kombu import Queue, Exchange
    
    BROKER_URL = "redis://10.37.208.40:6379/1"
    CELERY_RESULT_BACKEND = "redis://10.37.208.40:6379/2"
    CELERY_QUEUES = {
        Queue("default",Exchange("default"),routing_key="default"),
        Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
        Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B")
    
    
    }
    
    CELERY_ROUTES = {
        'demon3.taskA':{"queue":"for_task_A","routing_key":"for_task_A"},
        'demon3.taskB':{"queue":"for_task_B","routing_key":"for_task_B"}
    
    }
    
    服务端
    启动一个worker来指定taskA
    celery -A tasks worker -l info -n workerA.%h -Q for_task_A
    celery -A tasks worker -l info -n workerB.%h -Q for_task_B
    
    客户端执行
    import time
    
    from demon3 import *
    
    r1 = taskA.delay(20,10)
    time.sleep(1)
    print(r1)
    print(r1.result)
    r2 = taskB.delay(10,20,30)
    time.sleep(1)
    print(r2.result)
    print(r2.status)
    r3 = add.delay(100,200)
    print(r3.result)
    print(r3.status)
    
    输出结果:
    46adbdca-4e87-4d97-8b82-6883b7c3f64a
    200
    60
    SUCCESS
    None
    PENDING
    
    我们看到状态是PENDING,表示没有执行,这个是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。下面,我们来启动一个worker来执行celery队列中的任务。
    celery -A demon3 worker -l info -n worker.%h -Q celery 
    
    print(r3.status)    #SUCCESS
    Celery与定时任务
    下面我们接着在celeryconfig.py中添加CELERYBEAT_SCHEDULE变量:
    
    CELERY_TIMEZONE = 'UTC'
    CELERYBEAT_SCHEDULE = {
        'taskA_schedule' : {
            'task':'tasks.taskA',
            'schedule':20,
            'args':(5,6)
        },
        'taskB_scheduler' : {
            'task':"tasks.taskB",
            "schedule":200,
            "args":(10,20,30)
        },
        'add_schedule': {
            "task":"tasks.add",
            "schedule":10,
            "args":(1,2)
        }
    }
    
    注意格式,否则会有问题
    
    
    服务器端启动:
    celery -A demon3 beat
  • 相关阅读:
    python标准库学习-SimpleHTTPServer
    迁移cnblog博客
    zabbix监控使用
    20 个 OpenSSH 最佳安全实践
    编写基本的 udev 规则
    Linux.Siggen.180
    在 CentOS 7.0 上安装配置 Ceph 存储
    常用 GDB 命令中文速览
    Kubernetes TLS认证
    音乐下载api
  • 原文地址:https://www.cnblogs.com/wxp997/p/8227892.html
Copyright © 2011-2022 走看看