zoukankan      html  css  js  c++  java
  • 异步任务(Celery)详解

    一、背景

    在开发中,我们可能经常会遇到一些需要执行时间很长的任务,如果放在前端,会让用户一直卡在那儿等待或者一直转圈圈,体验非常不好。为了改善这种体验,我赶紧上网搜索,果然,前人早已有解决办法了。那就是异步。在Django中,我们可以使用celery异步框架,我们可以把耗时的任务扔到后台,而前端给用户立即返回,待用户需要查看结果时,点击查看即可,并且可以随时看到任务执行的状态。

    二、原理

    Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。它是Python写的库,但是它实现的通讯协议也可以使用ruby,php,javascript等调用。异步任务除了消息队列的后台执行的方式,还是一种则是定时计划任务。

    Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。它的架构组成如下图 

    组件:

    1、任务(tasks)--用户定义的函数,用于实现用户的功能,比如执行一个耗时很长的任务

    2、中间介(Broker)--用于存放tasks的地方,但是这个中间介需要解决一个问题,就是可能需要存放非常非常多的tasks,而且要保证Worker能够从这里拿取

    3、执行者(Worker)--用于执行tasks,也就是真正调用我们在tasks中定义的函数

    4、存储(Backend)--把执行tasks返回的结果进行存储,以供用户查看或调用

     

    三、实现

    1、各模块功能

    Celery中,以上组件具体功能如下:

    任务模块 Task

    包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。

    消息中间件 Broker

    Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。

    任务执行单元 Worker

    Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。

    任务结果存储 Backend

    Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, Redis 和 MongoDB 等。

    2、实现步骤

    使用 Celery 实现异步任务主要包含三个步骤:

    • 创建一个 Celery 实例 
    • 启动 Celery Worker 
    • 应用程序调用异步任务

    3、操作流程

    既然我们已经知道原理和实现步骤,那么就简单了,开搞吧。以下步骤基本上是按照celery官网最佳实践来操作的。

    相关链接:http://docs.jinkan.org/docs/celery/django/first-steps-with-django.html

     

    a、环境安装(RabbitMQ/Redis、Celery、django-celery、flower)

    b、创建工程(工程:tcelery、应用:app01)

    请注意:这个工程目录是适合于大的工程,小的工程可以直接把tasks放在celery.py文件中。我们大多数tasks都是位于app中,而且app一般不止一个,基本上都会有多个。

     

     c、新建文件

    celery下面需要修改的文件:celery.py、__init__.py、settings文件

    app01下面需要修改的文件:tasks.py文件

     

    d、修改过程

    1、修改settings文件,新增如下配置:

    import djcelery    #导入包
    djcelery.setup_loader() #加载tasks
    BROKER_URL = 'redis://127.0.0.1:6379/0'  #指定broker
    CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' #指定结果存储位置为本地数据库
    #CELERY_RESULT_BACKEND = 'redis://' #指定结果存储位置为redis
    CELERY_ACCEPT_CONTENT = ['application/json']
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
    
    CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'  #指定计划任务为本地数据库配置的
    #CELERY_RESULT_BACKEND = 'redis://'  #指定结果存放位置
    
    

      

    2、__init__.py文件

    #绝对导入,以免celery和标准库中的celery模块冲突
    from __future__ import absolute_import
    
    #以下导入时为了确保在Django启动时加载app,shared_task在app中会使用到
    from .celery import app as celery_app
    

      

    3、celery文件

    from __future__ import absolute_import,unicode_literals
    import os
    from celery import Celery
    from django.conf import settings
    
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "tcelery.settings")  #设置celery可以在命令行中使用
    app = Celery('tcelery', backend='amqp://guest@localhost//', broker='redis://localhost:6379/0')  #创建app实例,并指定backend和broker均为rabbitMQ
    #app = Celery('tcelery', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
    app.conf.CELERY_IGNORE_RESULT = False   #结果不忽略
    #app.conf.CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' #结果保存在redis中
    
    app.config_from_object('django.conf:settings')  #从文件中加载实例
    app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)  #自动加载tasks,注意:他会去app下面查找tasks.py文件,所以我们必须将task放在tasks.py文件中
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))
    

      

    4、tasks.py

    from tcelery import celery_app
    
    @celery_app.task
    def test(x,y):
        return x+y
    

      

    5、settings文件

    注意:前面settings文件已经修改过,这里再次提到,是需要把app和django-celery注册进入app

    INSTALLED_APPS = (
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        "app01", 
        "djcelery",
    )
    

      

    以上配置修改完成后,我们按照如下方式启动django、woker、flower。

     

     

     

     

     

    4、验证

    a、命令行调用

     

     b、woker执行

     

    c、backend保存结果

     

    d、flower结果查看

    同样,我们也可以将结果保存在redis里面。

     

    四、实操

    1、效果

    上面已经知道了原理和使用,那么下面就来进行实操吧,实操会让你感受celery的真正使用场景。

    场景:模拟后台执行一个耗时的任务(一个加法的任务),然后通过前端查询执行结果。

    效果:

    模拟一个加法的任务,用户点击“运行”后,我们把这个任务放到后台运行,通过sleep(10)来模拟耗时任务,然后通过点击“查看任务”查看执行的情况。

     

    再次查看执行情况:

     

    2、配置

     基本配置上面已经具备了,下面只说修改的几个地方:

    tasks.py

    @celery_app.task
    def test(x,y):
        """
        通过sleep来模拟需要执行很长时间的任务。
        :param x:
        :param y:
        :return:
        """
        sleep(10)
        return x+y
    

      

     views.py文件

    #coding:utf-8
    from django.shortcuts import render,HttpResponse,render_to_response
    from models import Add
    from .tasks import test,get_task_status
    import datetime
    import redis
    import json
    import time
    # Create your views here.
    
    def index(request):
        return  render_to_response('index.html')
    
    def add_1(request):
        try:
            first = int(request.GET.get('first'))
        except:
            first = 0
        try:
            second =int(request.GET.get('second'))
        except:
            second = 0
        result = test.apply_async(args=(first,second))
        dd = Add(task_id=result.id,first=first,second=second,log_date=datetime.datetime.now())
        dd.save()
        return render_to_response('index.html')
    
    # 任务结果
    def results(request):
        #查询所有的任务信息
        start_time = time.time()
        new_result = {}
        rt_list = []
        rows = Add.objects.all()
        for r in rows:
            status,result = get_status_id(r.task_id)
            new_result["task_id"] = r.task_id
            new_result["first"] = r.first
            new_result["second"] = r.second
            new_result["log_date"] = r.log_date
            new_result["status"] = status
            new_result["result"] = result
            rt_list.append(new_result)
            new_result = {}
        end_time = time.time()
        rt = end_time - start_time
        print rt
        return render_to_response('result.html',{'rows':rt_list})
    
    
    def get_status_id(task_id):
        """
        :param task_id:
        :return:
        坑:host填写主机名时,会耗时非常多,可以通过time获取,大概一次要1s
        task测试:这里
        """
        pool = redis.ConnectionPool(host='127.0.0.1',port=6379,db=0)
        r = redis.Redis(connection_pool=pool)
        task_id = 'celery-task-meta-'+task_id
        #start_time = time.time()
        try:
            status = json.loads(r.get(task_id)).get("status")
            result = json.loads(r.get(task_id)).get("result")
        except:
            status = 'Executing...'
            result = 0
        #end_time = time.time()
        #print 'time:%s' %(end_time-start_time)
        print status,result
        return status,result
    

      

     

    五、总结

    从原理和实现过程来看,celery的设计非常优秀,尤其是各模块的耦合,比如broker我们既可以使用redis、也可以使用rabbitMQ。

    backend也一样支持很多种方式。

    六、坑

    1、redis执行时间慢

    在本次试验的过程中遇到一个坑,通过python连接redis的时候,刚开始使用的是主机名:

    pool = redis.ConnectionPool(host='localhost',port=6379,db=0)
    

      发现redis执行时间非常常,查询一条记录需要1s左右,查了好久没找到原因。

    后来把主机名改为ip后,发现非常快:

     

    更多详细内容请参阅celery官网。

    http://docs.jinkan.org/docs/celery/index.html

    2、celery:Unrecoverable error: AttributeError("'unicode' object has no attribute 'iteritems')

    由于项目使用的django版本比较老,python2.7+django1.7下使用celery异步处理耗时请求。

    错误提示:Unrecoverable error: AttributeError("'unicode' object has no attribute 'iteritems')

    在stackoverflow中发现了解决办法,地址:stack overflowcelery-github
    问题的症结是redis的版本号为3.0以上,导致celery将其作为消息中间件的时候出现问题,给出的解决方案是安装3.0以下的redis版本。这里我们安装redis==2.10.6

    pip install redis==2.10.6
    

      

  • 相关阅读:
    MongoDB,无模式文档型数据库简介
    数据说话:怎样的程序员最抢手?
    猛醒:也许我们一生追求的都错了!
    中国风电生产监控平台界面
    如何跟着趋势去赚钱
    2015年最好的员工心态培养 -- 我们需要把简单的事情做到极致
    什么是程序员的核心竞争力?
    第一篇 技术选型
    .net core 读取配置文件
    .net core nlog记录日志
  • 原文地址:https://www.cnblogs.com/skyflask/p/9865378.html
Copyright © 2011-2022 走看看