zoukankan      html  css  js  c++  java
  • Celery-分布式任务队列

    简介

    Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必须工具。它是一个专注于实现处理的任务队列,同时也支持任务调度。

    当然在Celery在执行任务时需要通过一个中间件来接收和发送任务消息,以及存储任务结果,一般会使用rabbitMQ 或者 Redis。

    优点

    • 简单:一个熟悉了celery的工作流程后,配置和使用还是比较简单的
    • 高可用:当任务执行失败或执行过程中发生连接中断,celery会自动尝试重新执行任务
    • 快速:一个单进程的celery每分钟可处理上百万个任务
    • 灵活:几乎celery的各个组件都可以被扩展及自定制

    celery工作流程图如下

    也许现在会有人问,何为任务队列?

    任务队列

    任务队列是一种在线程或机器间分发任务的机制。
    消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。
    Celery是消息通信,通常使用中间人(Broker)在客户端和职程间斡旋。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程。

    所以在使用之前,我们需要安装好环境才可

    Celery的安装使用

    Celery的默认broker是RabbitMQ, 仅需配置一行就可以

    broker_url = 'amqp://guest:guest@localhost:5672//'
    

    事先的安装好rabbitMQ

    如果rabbitMQ安装不上也可以使用Redis作为broker

    安装redis组件

    $ pip install -U "celery[redis]"
    

    配置

    app.conf.broker_url = 'redis://localhost:6379/0'
    
    redis://:password@hostname:port/db_number
    
    app.conf.result_backend = 'redis://localhost:6379/0'
    

    一切准备就绪后,开始使用celery

    安装celery模块

    $ pip install celery
    

    那么开始创建一个任务文件tasks.py测试一下吧

    from celery import Celery
     
    app = Celery('tasks',
                 broker='redis://localhost',
                 backend='redis://localhost')
     
    @app.task
    def add(x,y):
        print("running...",x,y)
        return x+y
    
    # 注:localhost指的是本机,如果想要连远程主机,需要填写要链接的主机IP地址
    

    启动Celery Worker来开始监听并执行任务

    $ celery -A tasks worker --loglevel=info
    

    去调用任务

    重新打开一个终端去执行

    >>> from tasks import add
    >>> add.delay(4,5)
    

    这时我们会得到一个结果,如果我们这样写,就不会去等待结果的输入,而去做其他事情了

    > result = add.delay(4, 5)
    

    我们要查看返回是否已经完成处理任务

    >>> result.ready()
    False
    

    通过get去取值

    >>> result.get()
    9
    

    与Django配合使用  

    首先建立一个项目,目录如下

    - proj/
      - proj/__init__.py
      - proj/settings.py
      - proj/urls.py
    - manage.py
    

     在与settings同等级的目录下建立一个celery.py文件,内容如下

    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery
     
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
     
    app = Celery('proj')
     
    # Using a string here means the worker don't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')
     
    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()
     
     
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))
    
    # 需要注意的是'proj.settings和app = Celery('proj')是随你创建项目的名字不同而做修改
    

    在同settings同等目录下的__init__中填写如下内容

    from __future__ import absolute_import, unicode_literals
     
    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app
     
    __all__ = ['celery_app']
    

    然后就是在你自己的app里创建一个tasks.py填写你的任务,例如

    # Create your tasks here
    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
     
     
    @shared_task
    def add(x, y):
        return x + y
     
     
    @shared_task
    def mul(x, y):
        return x * y
     
    

    然后在Django里的view函数中去执行调用即可 

    from django.shortcuts import render,HttpResponse
    from app01 import tasks
    # Create your views here.
    from celery.result import AsyncResult
    
    
    
    def index(request):
    
        res = tasks.add.delay(5,5)
        print(res)
    
        return HttpResponse(res.task_id) # 返回的ID
    
    def task_res(request):
    
        result = AsyncResult(id="")
    
    
        return HttpResponse(result.status)
    

    更多详细用法请见 http://docs.jinkan.org/docs/celery/getting-started/introduction.html#id17  

     

      

      

      

  • 相关阅读:
    抖音的服务器到底啥配置?
    三句话搞懂 Redis 缓存穿透、击穿、雪崩!
    Windows环境下安装Redis
    Redis可视化工具 Redis Desktop Manager
    Eureka自我保护机制
    Eureka介绍
    Spring Cloud OpenFeign 工作原理解析
    客户端负载均衡Ribbon:Loadbalance的源码
    spring boot中的约定优于配置
    Arrays.asList()返回的集合不能进行add,remove等操作
  • 原文地址:https://www.cnblogs.com/flash55/p/6430310.html
Copyright © 2011-2022 走看看