zoukankan      html  css  js  c++  java
  • django celery异步框架

    描述:实现运维平台的异步执行与定时任务,以下简单描述了安装过程及使用。
     
    安装django和celery
    pip install django
    pip install celery 
    pip install django-celery
     
    新建一个项目名为news
    root@bogon:~# django-admin startproject news  
     
    查看目录树
    root@bogon:~# tree news/
    news/
    ├── manage.py
    └── news
    ├── __init__.py
    ├── settings.py
    ├── urls.py
    └── wsgi.py
     
    定义一个celery实例 news/news/celery.py
    from __future__ import absolute_import
     
    import os
     
    from celery import Celery
     
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'news.settings')
     
    from django.conf import settings
     
    app = Celery('news')
     
    # Using a string here means the worker will not have to
    # pickle the object when using Windows.
    app.config_from_object('django.conf:settings')
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
     
     
    @app.task(bind=True)
    def debug_task(self):
      print('Request: {0!r}'.format(self.request))
     
    在django中加载应用news/news/__init__.py
    from __future__ import absolute_import
     
    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app
    

     

    celery基本配置
    import djcelery
    djcelery.setup_loader()
     
    BROKER_URL = 'django://'
    # 以下为mq配置
    # BROKER_URL = "amqp://guest:guest@localhost:5672//"
     
    INSTALLED_APPS = (
      'djcelery',
      'kombu.transport.django',
    )
     
    celery队列配置以及详细说明
    # 以下是标准的redis配置
    BROKER_URL = 'redis://localhost:6379'
    CELERY_RESULT_BACKEND = 'redis://localhost:6379'
    CELERY_ACCEPT_CONTENT = ['application/json']
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
    CELERY_TIMEZONE = 'Asia/Shanghai'
     
     
    # rabbitmq 配置
    # 官网优化的地方也推荐使用c的librabbitmq
    CELERY_RESULT_BACKEND = "amqp"
    # celery任务执行结果的超时时间
    CELERY_TASK_RESULT_EXPIRES = 1200
    # celery worker的并发数
    CELERYD_CONCURRENCY = 50
    # celery worker 每次去rabbitmq取任务的数量
    CELERYD_PREFETCH_MULTIPLIER = 4
    # 每个worker执行了多少任务就会死掉
    CELERYD_MAX_TASKS_PER_CHILD = 40
    # 这是使用了django-celery默认的数据库调度模型,任务执行周期都被存在你指定的orm数据库中
    CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
    # 我做的rabbitmq和celery监控很完善所以这个任务超时时间比较短只有半小时
    CELERYD_TASK_TIME_LIMIT = 1800
    # 默认的队列,如果一个消息不符合其他的队列就会放在默认队列里面
    CELERY_DEFAULT_QUEUE = "default_dongwm"
     
    ## 以下是队列的一些配置
    CELERY_QUEUES = {
      "default_dongwm": { # 这是上面指定的默认队列
        "exchange": "default_dongwm",
        "exchange_type": "direct",
        "routing_key": "default_dongwm"
      },
      "topicqueue": { # 这是一个topic队列 凡是topictest开头的routing key都会被放到这个队列
        "routing_key": "topictest.#",
        "exchange": "topic_exchange",
        "exchange_type": "topic",
      },
      "test2": { # test和test2是2个fanout队列,注意他们的exchange相同
        "exchange": "broadcast_tasks",
        "exchange_type": "fanout",
        "binding_key": "broadcast_tasks",
      },
      "test": {
        "exchange": "broadcast_tasks",
        "exchange_type": "fanout",
        "binding_key": "broadcast_tasks2",
      },
    }
    
    class MyRouter(object): def route_for_task(self, task, args=None, kwargs=None): if task.startswith('topictest'): return { 'queue': 'topicqueue', } # 我的dongwm.tasks文件里面有2个任务都是test开头 elif task.startswith('webui.tasks.test'): return { "exchange": "broadcast_tasks", } # 剩下的其实就会被放到默认队列 else: return None # CELERY_ROUTES本来也可以用一个大的含有多个字典的字典,但是不如直接对它做一个名称统配 CELERY_ROUTES = (MyRouter(), )
     
    创建一个应用和异步任务
    root@bogon:~# django-admin startapp webui
    root@bogon:~# cat webui/tasks.py
    from __future__ import absolute_import
     
    from celery import shared_task
     
    @shared_task
    def add(x, y):
      return x + y
     
    创建一个定时任务
    #每分钟执行任务
    @periodic_task(run_every=crontab(minute="*/1"))
    def check_ping():
      print 'Pong'
    

      

     
    更多详细的配置请参考官方文档。
    http://docs.celeryproject.org/en/latest/index.html
  • 相关阅读:
    Kafka源码分析9:Controller控制器的原理(图解+秒懂+史上最全)
    nacos高可用 (史上最全 + 图解+秒懂)
    Kafka源码分析11:PartitionStateMachine分区状态机(图解+秒懂+史上最全)
    Kafka源码分析2:Kafka产品选择和Kafka版本选择(史上最全)
    Kafka源码分析10:副本状态机ReplicaStateMachine详解 (图解+秒懂+史上最全)
    Netty解决Selector空轮询BUG的策略(图解+秒懂+史上最全)
    Kafka源码分析1:源码的开发环境搭建 (图解+秒懂+史上最全)
    mysql pxc集群 原理 (图解+秒懂+史上最全)
    seat TCC 实战(图解_秒懂_史上最全)
    seata 源码解析(图解_秒懂_史上最全)
  • 原文地址:https://www.cnblogs.com/letong/p/4753624.html
Copyright © 2011-2022 走看看