zoukankan      html  css  js  c++  java
  • django celery的分布式异步之路(一) 起步

     如果你看完本文还有兴趣的话,可以看看进阶篇http://www.cnblogs.com/kangoroo/p/7300433.html

    设想你遇到如下场景:

    1)高并发

    2)请求的执行相当消耗机器资源,流量峰值的时候可能超出单机界限

    3)请求返回慢,客户长时间等在页面等待任务返回

    4)存在耗时的定时任务

    这时你就需要一个分布式异步的框架了。

    celery会是一个不错的选择。本文将一步一步的介绍如何使用celery和django进行集成,并进行分布式异步编程。

    1、安装依赖

    默认你已经有了python和pip。我使用的版本是:

    python 2.7.10
    pip 9.0.1
    virtualenv 15.1.0

    创建沙盒环境,我们生产过程中通过沙盒环境来使用各种python包的版本,各个应用的沙盒环境之间互不干扰。

    $ mkdir kangaroo
    $ cd kangaroo
    $ virtualenv kangaroo.env
    # 沙盒下面有什么,可以看到有python的bin、include和pip
    $ ll kangaroo.env
    total 8
    drwxrwxr-x 2 data_monitor data_monitor 4096 Aug  7 16:00 bin
    drwxrwxr-x 2 data_monitor data_monitor   22 Aug  7 16:00 include
    drwxrwxr-x 3 data_monitor data_monitor   22 Aug  7 16:00 lib
    -rw-rw-r-- 1 data_monitor data_monitor   60 Aug  7 16:00 pip-selfcheck.json
    # 让沙盒环境在当前session(shell)中生效
    $ source kangaroo.env/bin/activate
    # 可以看到命令行首多了(kangaroo.env),而且python的路径已经变了
    (kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ which python
    /data/home/data_monitor/yangfan/test/kangaroo/kangaroo.env/bin/python

    下面我们开始在kangaroo环境下安装相应版本的django和celery,以及django-celery集成包。

    (kangaroo.env) [XXX@XXX kangaroo]$ pip install django==1.10.6
    (kangaroo.env) [XXX@XXX kangaroo]$ pip install celery==3.1.25
    (kangaroo.env) [XXX@XXX kangaroo]$ pip install django-celery==3.2.1

    我在安装的时候写明了版本号,是因为这套版本号在我们的生产环境过玩转过。

    如果你换了对应的版本号的话,可能会引发冲突,出现意想不到的问题。亲测还是有一些版本之间是有怪问题的。

    2、创建工程

    创建工程kangaroo:django-admin startproject kangaroo

    # 在kangaroo.env同级目录下创建工程kangaroo
    cd /home/data_monitor/yangfan/test/kangaroo
    # 创建工程kangaroo (kangaroo.
    env) [data_monitor@bigdata-arch-client10 kangaroo]$ django-admin startproject kangaroo (kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ ll total 0 drwxrwxr-x 3 data_monitor data_monitor 37 Aug 7 16:45 kangaroo drwxrwxr-x 5 data_monitor data_monitor 65 Aug 7 16:00 kangaroo.env

    创建APP foot:python manage.py startapp foot

    # 进入工程目录,你会看到manage.py文件
    cd kangaroo
    # 创建APP foot
    (kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ python manage.py startapp foot
    (kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ ll
    total 4
    drwxrwxr-x 3 data_monitor data_monitor 116 Aug  7 16:48 foot
    drwxrwxr-x 2 data_monitor data_monitor 108 Aug  7 16:48 kangaroo
    -rwxrwxr-x 1 data_monitor data_monitor 806 Aug  7 16:45 manage.py
    # 进入foot app目录,看一下有什么
    (kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ cd foot/
    (kangaroo.env) [data_monitor@bigdata-arch-client10 foot]$ ll
    total 20
    -rw-rw-r-- 1 data_monitor data_monitor  63 Aug  7 16:48 admin.py
    -rw-rw-r-- 1 data_monitor data_monitor 124 Aug  7 16:48 apps.py
    -rw-rw-r-- 1 data_monitor data_monitor   0 Aug  7 16:48 __init__.py
    drwxrwxr-x 2 data_monitor data_monitor  24 Aug  7 16:48 migrations
    -rw-rw-r-- 1 data_monitor data_monitor  98 Aug  7 16:48 models.py
    -rw-rw-r-- 1 data_monitor data_monitor  60 Aug  7 16:48 tests.py
    -rw-rw-r-- 1 data_monitor data_monitor  63 Aug  7 16:48 views.py

     至此我们创建了工程kangaroo和app foot,下面我们介绍如何集成celery。

    3、django-celery的集成配置

    我们这里集成的方式是使用django-celery包。

    集成配置要注意以下几个地方就好了,配置起来还是比较简单的。

    1)修改kangaroo/settings.py文件

    让djcelery模块生效

    import os
    import djcelery
    djcelery.setup_loader()
    ...
    INSTALLED_APPS = (
       ...
        'djcelery',
        'kombu.transport.django',
        ...
    )

    配置broker和backend

    # Celery settings
    # redis做broker, 第二个":"前后是redis的用户名密码,后面的2是db
    # BROKER_URL = 'redis://:password@10.93.84.53:6379/2'
    # rabbitMQ做broker,第二个":"前后是rabbitMQ的用户名密码
    BROKER_URL = 'amqp://admin:bigdata123@10.93.21.21:5672//'
    # Celery的backend记录地址,这里只给出redis的配置
    CELERY_RESULT_BACKEND = 'redis://:bigdata123@10.93.84.53:6379/3'

    4、第一个task

    其实应该在创建app的时候就将appName添加到settings.py的INSTALLED_APPS中,我们没有这样做事留到现在好说明问题。

    我们修改settings.py加入foot。

    import os
    import djcelery
    djcelery.setup_loader()
    ...
    INSTALLED_APPS = (
       ...
        'djcelery',
        'kombu.transport.django',
       ...
        'foot',
        ...
    )

    当settings.py中的djcelery.setup_loader()运行时, Celery便会查看所有INSTALLED_APPS中app目录中的tasks.py文件, 找到标记为@task的function, 并将它们注册为celery task。

    所以我们在foot包下创建tasks.py文件,并且添加我们的task。

    foot/tasks.py

    # -*- coding:utf-8 -*-
    import time
    import logging
    
    from celery import task
    logger = logging.getLogger(__name__)
    
    @task()
    def foot_task(param_dict):
        logger.info('foot task start! param_dict:%s' % param_dict)
        time.sleep(10)
        logger.info('foot task finished!')
        return

    这个task等待接收一个参数字典,只是简单的打印参数,然后sleep10s就退出了。

    让task在后台worker中注册,当有任务分发下来的时候就开始执行。只需执行

    python manage.py celery worker --loglevel=info

    5、分发任务dispatch

    任务触发的两种方式:

    1)定时调度,可以适用celery beat,这里没有详述,因为在实际生产中,我们使用了apschedule做了定时器。

    2)请求异步执行,这里给出了例子,服务接收http请求,直接返回,任务异步的丢给worker执行。

    我这里写了一个通用的函数,这个函数用于分发任务

    def dispatch(task, param_dict):
        param_json = json.dumps(param_dict)
        try:
            task.apply_async(
                [param_json],
                retry=True,
                retry_policy={
                    'max_retries': 1,
                    'interval_start': 0,
                    'interval_step': 0.2,
                    'interval_max': 0.2,
                },
            )
        except Exception, ex:
            logger.info(traceback.format_exc())
            raise

    如何触发foot.tasks中的任务呢,只需要

    import logging
    import traceback
    from django.http import JsonResponse
    from django.views.decorators.csrf import csrf_exempt
    
    from foot.tasks import foot_task
    from common.dispatcher import dispatch
    
    logger = logging.getLogger(__name__)
    
    @csrf_exempt
    def hello(request):
        if request.method == 'GET':
            try:
                user = request.GET.get('username')
                dispatch(test_task, {'hello': user})
                return JsonResponse({'code': 0, 'msg':'success'})
            except Exception, ex:
                return JsonResponse({'code: -1, 'msg': traceback.format_exc()})

    1)当你在客户端发送请求:hello?username='kangaroo'时

    2)服务瞬间返回:{'code': 0, 'msg':'success'}

    3)后端sleep10秒后执行成功,打印hello:kangaroo

    这就是异步的效果。

  • 相关阅读:
    ABP之动态WebAPI
    ssh下常用操作汇总(good)
    XP下类似%windir% %userprofile% 的变量的说明(转)
    Cannot update paths and switch to branch at the same time(转)
    git branch(转)
    Git提交代码的处理流程(转)
    Android锁定EditText内容和随机生成验证码
    Android L下载
    vi 命令 使用方法
    Openfire开发配置,Openfire源码配置,OpenFire二次开发配置
  • 原文地址:https://www.cnblogs.com/kangoroo/p/7299920.html
Copyright © 2011-2022 走看看