zoukankan      html  css  js  c++  java
  • Celery配置

    Celery 配置

    最近项目中需要做一些定时任务的工作,之前都是用 LInux 的Crontab 但是任务多了之后 不好维护也没有什么监控的措施。所以考虑使用Celery 来解决这一问题。

    1.安装

    pip install celery-with-redis
    

    注意:其实celery 支持多中broker

    Name Status Monitoring Remote Control
    RabbitMQ Stable Yes Yes
    Redis Stable Yes Yes
    Amazon SQS Stable No No
    Zookeeper Experimental No No

    可以看见redis 和RabbitMq支持的是最好的,其中官方推荐RabbitMq,因为redis有断电或者重启丢失数据的风险。不过在这里我因为方便使用的是redis

    2 配置

    安装之后开始写配置文件 celeryconfig.py

    BROKER_URL = 'redis://127.0.0.1:6379/1' #消息队列选用
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'#结果存储
    CELERY_TASK_SERIALIZER = 'msgpack'#任务序列化方式
    CELERY_RESULT_SERIALIZER = 'json'#结果序列化方式
    CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
    CELERY_ACCEPT_CONTENT = ['json', 'msgpack']#接收序列化方式,masgpack序列化的方式效率很高。
    
    
    创建celery 实例
    app = Celery('app', include=['YoutProjectName.tasks'])
    
    app.config_from_object('YoutProjectName.celeryconfig')  # 设置环境变量
    

    3运行代码

    创建YouProjName.tasks.py

    # -*- coding: utf-8 -*-
    #因为celery 不支持隐式导入
    from __future__ import absolute_import, unicode_literals
    import random
    #引入app实例
    from CronTab import celery_app
    #引入日志类
    from celery.utils.log import get_task_logger
    
    logger = get_task_logger(__name__)
    
    #celery 装饰器,可以在运行的时候讲任务加到队列 这里是注册,将任务加到了注册中心,这样才能消费者才能收到
    @celery_app.task()
    def mul(x, y):
        total = x * (y * random.randint(3, 100))
        return total
    
    
    @celery_app.task()
    def xsum(numbers):
        return sum(numbers)
    
    
    @celery_app.task()
    def xsum3(numbers):
        return sum(numbers)
    

    打开terminal开始运行

    celery -A CronTab worker -l info 可以看到如下输出

     -------------- celery@bogon v3.1.25 (Cipater)
    
    ----  ----- 
    
    --- * ***  * -- Darwin-17.3.0-x86_64-i386-64bit
    
    -- * -  --- 
    
    - ** ---------- [config]
    - ** ---------- .> app:         app:0x1033ad978  
    - ** ---------- .> transport:   redis://127.0.0.1:6379/1
    - ** ---------- .> results:     redis://127.0.0.1:6379/0
    - *** --- * --- .> concurrency: 8 (prefork)
      -- *** ---- 
      --- * ----- [queues]
       -------------- .> default          exchange=tasks(topic) key=task.#
    
    

    Options:
    -A APP, --app=APP app instance to use (e.g. module.attr_name) 指定实例app
    -b BROKER, --broker=BROKER 指定broker

    --loader=LOADER name of custom loader class to use.
    --config=CONFIG Name of the configuration module
    --workdir=WORKING_DIRECTORY

    -C, --no-color
    -q, --quiet
    --version show program's version number and exit
    -h, --help show this help message and exit

    -A 是指定项目 worker 执行,

    -l 日志级别

    再打开一个terminal 开始

    消费

    In [1]: from apps.message import tasks
    
    In [2]: tasks.div.delay(1,2)
    
    Out[2]: <AsyncResult: 7b8db8fc-3119-48c5-8d95-c74a283ae8b2>
    
    

    注意调用的时候要加delay 语句,这样的话,就会发送给任务队列,让任务队列来执行该语句。

    这个时候我们再切回到worker进程

    消费

    [2018-02-14 10:00:53,963: INFO/MainProcess] Task apps.message.tasks.div[7b8db8fc-3119-48c5-8d95-c74a283ae8b2] succeeded in 0.008831199025735259s: 0.5

    可以看见就这样队列执行了你的任务,这就是celery 任务队列生产消费的过程

    使用不同的队列

    当你有很多任务需要执行的时候,不要只使用默认的queue,这样会相互影响,并且拖慢任务执行的,导致重要的任务不能被快速的执行。

    CELERY_QUEUES = (  # 定义任务队列
    
        Queue('default', routing_key='task.#'),  # 路由键以“task.”开头的消息都进default队列
    
        Queue('web_tasks', routing_key='web.#'),  # 路由键以“web.”开头的消息都进web_tasks队列
    
    )
    
    CELERY_DEFAULT_EXCHANGE = 'tasks'  # 默认的交换机名字为tasks
    
    CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'  # 默认的交换类型是topic
    
    CELERY_DEFAULT_ROUTING_KEY = 'task.default'  # 默认的路由键是task.default,这个路由键符合上面的default队列
    
    CELERY_ROUTES = {
    
        'apps.message.tasks.add': {  # tasks.add的消息会进入web_tasks队列
    
            'queue': 'web_tasks',
    
            'routing_key': 'web.add',
        },
        'apps.message.tasks.div': {  # tasks.add的消息会进入web_tasks队列
    
            'queue': 'web_tasks',
    
            'routing_key': 'web.div',
        }
    }
    

    然后执行命令

    celery -A CronTab worker -l info -Q web_task

    这样的话,只会执行文在web_task中的任务,其他任务都不在执行

    启动多个workers执行不同的任务

    在同一台机器上,对于优先级不同的任务最好启动不同的worker去执行,比如把实时任务和定时任务分开,把执行频率高的任务和执行频率低的任务分开,这样有利于保证高优先级的任务可以得到更多的系统资源,同时高频率的实时任务日志比较多也会影响实时任务的日志查看,分开就可以记录到不同的日志文件,方便查看。

    $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h
    $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2.%h
    $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3.%h
    

    可以像这样启动不同的worker,%h可以指定hostname,详细说明可以查看官方文档
    高优先级的任务可以分配更多的concurrency,但是并不是worker并法数越多越好,保证任务不堆积就好。

    定时任务

    在 celeryconfig.py 文件中添加如下配置

    CELERYBEAT_SCHEDULE = {
    
        'add': {
            'task': 'apps.message.tasks.add',
            'schedule': timedelta(seconds=1),
            'args': (16, 16)
        }
    

    然后新开一个命令行启动

    celery beat -A CronTab

    可以看见任务开始不停的向队列发送

    [2018-02-14 09:25:42,689: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
    [2018-02-14 09:25:43,689: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
    [2018-02-14 09:25:44,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
    [2018-02-14 09:25:45,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
    [2018-02-14 09:25:46,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
    [2018-02-14 09:25:47,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
    [2018-02-14 09:25:48,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
    [2018-02-14 09:25:49,691: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
    
    
    

    回到worker 可以看到,开始没秒钟接收并且消费了

    [2018-02-14 11:09:06,332: INFO/Worker-8] apps.message.tasks.add[36490b36-9f3a-4158-aa2e-a6640d4607b6]: Executing task id 36490b36-9f3a-4158-aa2e-a6640d4607b6, args: [16, 16] kwargs: {}
    
    [2018-02-14 11:09:06,332: INFO/Worker-1] apps.message.tasks.add[36490b36-9f3a-4158-aa2e-a6640d4607b6]: Executing task id 36490b36-9f3a-4158-aa2e-a6640d4607b6, args: [16, 16] kwargs: {}
    
    [2018-02-14 11:09:06,333: INFO/MainProcess] Task apps.message.tasks.add[36490b36-9f3a-4158-aa2e-a6640d4607b6] succeeded in 0.001320198003668338s: 32
    
    [2018-02-14 11:09:06,333: INFO/MainProcess] Task apps.message.tasks.add[36490b36-9f3a-4158-aa2e-a6640d4607b6] succeeded in 0.0014644850161857903s: 32
    
    
  • 相关阅读:
    一步步学习SPD2010--第十章节--SP网站品牌化(4)--创建并关联CSS文件
    一步步学习SPD2010--第十章节--SP网站品牌化(3)--在内容页中识别样式
    一步步学习SPD2010--第十章节--SP网站品牌化(2)--在CSS中识别样式
    一步步学习SPD2010--第十章节--SP网站品牌化(1)--设置CSS和颜色编码页面编辑器选项
    一步步学习SPD2010--第十章节--SP网站品牌化
    一步步学习SPD2010--第九章节--使用可重用工作流和工作流表单(14)--关键点
    一步步学习SPD2010--第九章节--使用可重用工作流和工作流表单(13)--修改任务表单
    一步步学习SPD2010--第九章节--使用可重用工作流和工作流表单(12)--给初始表单添加关联字段
    一步步学习SPD2010--第九章节--使用可重用工作流和工作流表单(11)--修改关联表单
    matplotlib库疑难问题---1、解决中文乱码问题
  • 原文地址:https://www.cnblogs.com/maxaimee/p/8448725.html
Copyright © 2011-2022 走看看