zoukankan      html  css  js  c++  java
  • Python之celery

    一、celery简介

      Celery是一个Python开发的异步分布式任务调度模块。celery本身不提供消息服务,使用第三方服务,也就是borker来传递任务,目前支持rebbing, redis, 数据库等。

      broker是一个消息传输的中间件,可以理解为一个邮箱。每当应用程序调用celery的异步任务的时候,会向broker传递消息,而后celery的worker将会取到消息,进行对于的程序执行。好吧,这个邮箱可以看成是一个消息队列。那么什么又是backend,通常程序发送的消息,发完就完了,可能都不知道对方时候接受了。为此,celery实现了一个backend,用于存储这些消息以及celery执行的一些消息和结果。对于 brokers,官方推荐是rabbitmq和redis,至于backend,就是数据库啦。为了简单起见,我们都用redis。

      redis连接URL的格式为:

      redis://password@hostname:port/db_number

    二、celery小例子

      1.在Linux上要先启动redis(./redis-server &)。

      2.程序代码如下:

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    # @Time    :2017/12/19 15:12
    # @Author  :huangdongju
    # @File    :17-2.py
    
    from celery import Celery
    
    broker = "redis://192.168.203.12:6379/5"  #5代表第五个数据库
    backend = "redis://192.168.203.12:6379/6"
    app = Celery("17-2",broker=broker,backend=backend)
    
    @app.task   #注册到任务中去
    def add(x,y):
        return x + y
    
    
    re = add.delay(10,20)
    print (re.result)
    print (re.ready)
    print (re.get(timeout = 1))
    print (re.status)

      3.将worker放置在Linux上,在Linux上进行如下配置(Python的版本至少为2.7.0以上)。

      # pip install redis

      # pip install celery

      # cd /usr/local/src

      # mkdir celery

      # cd celery

      # vim 17-2.py  (要与之前的文件名相同)

      # celery -A 17-2 worker - l info

         4.结果

     在Linux上输出的日志为:

     三、celery多实例

      celery可以支持多台不通的计算机执行不同的任务或相同的任务。如果要说celery的分布式应用的的话,那就是celery的消息路由机制,就要提一下AMQP协议。具体的可以查看AMQP文档。简单来说就是可以有多个消息队列(Message Queue),不同的消息可以指定发给不同的Message Queue,而这是通过Exchange来实现的。发送消息到Message Queue中时,可以指定routing key ,Exchange通过routing key来把消息路由(routes)到不同的Message Queue中去。实例代码如下:

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    # @Time    :2017/12/20 16:19
    # @Author  :huangdongju
    # @File    :demon3.py
    import time
    from celery import Celery
    
    app = Celery()
    app.config_from_object("celeryconfig")
    
    @app.task
    def taskA(x,y):
        return x*y
    
    @app.task
    def taskB(x,y,z):
        return x+y+z
    
    @app.task
    def add(x,y):
        return x+y



    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    # @Time :2017/12/20 9:44
    # @Author :huangdongju
    # @File :celeryconfig.py
    from kombu import Queue, Exchange

    BROKER_URL = "redis://192.168.203.12:6379/1"
    CELERY_RESULT_BACKEND = "redis://192.168.203.12:6379/2" #要大写

    CELERY_QUEUES = { #配置消息队列
    Queue("default",Exchange("default"),routing_key = "default"),
    Queue("for_task_A",Exchange("for_task_A"),routing_key = "for_task_A"),
    Queue("for_task_B",Exchange("for_task_B"),routing_key = "for_task_B")

    }

    CELERY_ROUTES ={
    "demon3.taskA":{"queue":"for_task_A","routing_key":"for_task_A"},
    "demon3.taskB":{"queue":"for_task_B","routing_key":"for_task_B"}
    }





      在服务器上要同步demon3.py与celeryconfig.py这两个文件。然后打开两个进程:

    celery -A demon3 worker -l info -n workerA.%h -Q for_task_A
    
    celery -A demon3 worker -l info -n workerB.%h -Q for_task_B

    结果如下:

    200
    SUCCESS
    6
    SUCCESS
    None
    PENDING

      在上述结果中,看到状态有为PENDING,表示没有执行,这是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。

    四、celery与定时任务

      在celery中执行定时任务非常简单,只需要设置celery对象中的CELERY_SCHEDULE变量。实例代码为:

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    # @Time    :2017/12/20 16:19
    # @Author  :huangdongju
    # @File    :demon3.py
    import time
    from celery import Celery
    
    app = Celery()
    app.config_from_object("celeryconfig")
    
    @app.task
    def taskA(x,y):
        return x*y
    
    @app.task
    def taskB(x,y,z):
        return x+y+z
    
    @app.task
    def add(x,y):
        return x+y


    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    # @Time :2017/12/20 9:44
    # @Author :huangdongju
    # @File :celeryconfig.py
    from kombu import Queue, Exchange

    BROKER_URL = "redis://192.168.203.12:6379/1"
    CELERY_RESULT_BACKEND = "redis://192.168.203.12:6379/2" #要大写

    CELERY_QUEUES = { #配置消息队列
    Queue("default",Exchange("default"),routing_key = "default"),
    Queue("for_task_A",Exchange("for_task_A"),routing_key = "for_task_A"),
    Queue("for_task_B",Exchange("for_task_B"),routing_key = "for_task_B")

    }

    CELERY_ROUTES ={
    "demon3.taskA":{"queue":"for_task_A","routing_key":"for_task_A"},
    "demon3.taskB":{"queue":"for_task_B","routing_key":"for_task_B"}
    }

    #定时任务
    CELERY_TIMEZONE = 'UTC'
    CELERY_SCHEDULE = {
    'taskA_schedule':{
    'task':'demon3.taskA',
    'schedule':20,
    'args':(5,6)
    },

    'taskB_schedule':{
    'task':'demon3.taskB',
    'schedule':50,
    'args':(100,200,300)
    },

    'add_schedule':{
    'task':'demon3.add',
    'schedule':10,
    'args':(110,120)
    },
    }
  • 相关阅读:
    Linux三剑客awk命令试题
    Linux综合练习题
    Linux系统用户角色划分
    Linux添加磁盘fdisk命令
    Linux的七种运行级别
    Linux 文件类型
    Linux开机启动程序
    Linux软件安装
    linux运行级别
    Linux /etc目录下的重要配置文件
  • 原文地址:https://www.cnblogs.com/huangdongju/p/8066796.html
Copyright © 2011-2022 走看看