zoukankan      html  css  js  c++  java
  • python---Celery分布式任务队列了解

    linux下定时器了解

    Celery 框架学习笔记(不错哟)

    Celery 分布式任务队列快速入门

    Celery的最佳实践

    一、Celery介绍

    Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子:

    1. 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。 
    2. 你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福

    Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis。所以我们需要先去安装这个软件。

    Celery的构架

    Celery的架构由三部分组成,消息中间件(message broker)任务执行单元(worker)任务执行结果存储(task result store)组成。

    消息中间件

    Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成,包括,RabbitMQ,Redis,MongoDB等,这里我先去了解RabbitMQ,Redis

    任务执行单元

    Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中

    任务结果存储

    Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括Redis,MongoDB,Django ORM,AMQP等,这里我先不去看它是如何存储的,就先选用Redis来存储任务执行结果。

    一般通过启动一个或多个worker进程来部署Celery。
    这些worker进程连接上消息代理(以下称之为broker)来获取任务请求。
    broker随机将任务请求分发给worker。
    通过调用Celery的API,用户生成一个任务请求,并且将这个请求发布给broker。
    在worker完成任务后,将完成的任务信息发送给result store(设置的backend),从中获取信息。

    通过启动新的worker进程并让这些进程连上broker,可以很方便的扩展worker池。
    每个worker可以和其他的worker同步执行任务。

    优点:

    简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的
    高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
    快速:一个单进程的celery每分钟可处理上百万个任务
    灵活: 几乎celery的各个组件都可以被扩展及自定制

    安装:

    pip3 install celery

    二:基本使用(windows下即可)

    (一)创建一个任务文件cel.py

    from celery import Celery
    
    app = Celery('tasks',
                 broker='redis://127.0.0.1',  #这里面存放任务,worker去里面获取任务,将执行结果放入backend中      注意:由于我们安装的redis配置为本机监听,所以使用127.0.0.1才可,若是使用localhost可能无法连接
                 backend='redis://127.0.0.1')  #从这里面获取我们任务执行的结果
    
    @app.task
    def add(x, y):
        print("running...", x, y)
        return x + y
    
    @app.task
    def cmd(comm):
        print(comm)
        return comm

    (二)启动Celery Worker来开始监听并执行任务

    注意:在windows下可能使用会报错

    解决方案:

    1.安装eventlet

    pip install eventlet

    2.启动时加上一个参数

    celery -A <任务文件> worker -l info -P eventlet

    开始使用

    1.启动Celery Worker来开始监听并执行任务

    celery -A cel worker -l info -P eventlet

    2.在python命令行中连接celery,调用任务

    D:MyPythonday25	wisted_test>python
    Python 3.5.4 (v3.5.4:3f56838, Aug  8 2017, 02:17:05) [MSC v.1900 64 bit (AMD64
     on win32
    Type "help", "copyright", "credits" or "license" for more information.
    >>> from cel import add,cmd
    >>> t = add.delay(45,1)
    >>> t.get()
    46

     (三)设置超时处理

    from celery import Celery
    import time
    
    app = Celery('tasks',
                 broker='redis://127.0.0.1',
                 backend='redis://127.0.0.1')
    
    
    @app.task
    def add(x, y):
        print("running...", x, y)
        return x + y
    
    @app.task
    def cmd(comm):
        time.sleep(10)
        return comm

    再次启用:

    >>> c = cmd.delay("ccc")  
    >>> c.get()  #会等待到10秒才会获取到值
    'ccc'
    [2018-06-30 15:21:11,836: INFO/MainProcess] Task cel.cmd[ff9379e7-520e-41c2-ac1d
    -79400e042fb8] succeeded in 10.014999999999418s: 'ccc'  #会一直等待10秒才会返回给get

    我们需要设置超时时间:

    >>> c.get(timeout=1)  #我们将任务delay交给远程后,远程已经开始执行了,不过当我们调用get时远程还是没有执行完毕而已
    Traceback (most recent call last):
      File "C:UsersAdministratorAppDataLocalProgramsPythonPython35libsite-p
    ackagesceleryackendsasync.py", line 255, in _wait_for_pending
        on_interval=on_interval):
      File "C:UsersAdministratorAppDataLocalProgramsPythonPython35libsite-p
    ackagesceleryackendsasync.py", line 54, in drain_events_until
        raise socket.timeout()
    socket.timeout
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "C:UsersAdministratorAppDataLocalProgramsPythonPython35libsite-p
    ackagescelery
    esult.py", line 224, in get
        on_message=on_message,
      File "C:UsersAdministratorAppDataLocalProgramsPythonPython35libsite-p
    ackagesceleryackendsasync.py", line 188, in wait_for_pending
        for _ in self._wait_for_pending(result, **kwargs):
      File "C:UsersAdministratorAppDataLocalProgramsPythonPython35libsite-p
    ackagesceleryackendsasync.py", line 259, in _wait_for_pending
        raise TimeoutError('The operation timed out.')
    celery.exceptions.TimeoutError: The operation timed out.

    我们要么捕获错误,要么使用其他方法ready,去查看远程执行状态,False未完成,True已完成

    >>> c = cmd.delay("cccde")
    >>> c.ready()
    False
    >>> c.ready()
    False
    >>> c.ready()
    False
    >>> c.ready()
    True
    >>> c.get()
    'cccde'

    (四)其他命令

    1.开启多个worker在后台

    celery multi start w1 -A proj -l info

    2.重启

    celery  multi restart w1 -A proj -l info

    3.停止

    celery multi stop w1 -A proj -l info

    4.等待任务执行完毕后停止

    celery multi stopwait w1 -A proj -l info

    三:项目中使用celery(常与Django一起)

    项目目录:

    celProject  目录
    ---celery.py  #这个文件必须这样命名
    ---tasks.py
    ---tasks2.py
    from __future__ import absolute_import, unicode_literals    #是说下面的celery是python安装包决定路径引入,而不是当前项目
    from celery import Celery   #celery是指python安装包决定路径引入,.celery是当前目录引入
    
    app = Celery('proj',    #celery APP名称
                 broker='redis://127.0.0.1',
                 backend='redis://127.0.0.1',
                 include=['celProject.tasks','celProject.tasks2'])    #include引入的是当前项目下的任务,为列表,可以引入多个
    
    # Optional configuration, see the application user guide.
    app.conf.update(    #设置配置
        result_expires=3600,    #设置结果缓存时间
    )
    
    if __name__ == '__main__':
        app.start()
    celery.py
    from __future__ import absolute_import, unicode_literals
    from .celery import app ##用到他的装饰器
    
    @app.task
    def add(x, y):
        return x + y
    
    
    @app.task
    def mul(x, y):
        return x * y
    
    
    @app.task
    def xsum(numbers):
        return sum(numbers)
    tasks.py
    from __future__ import absolute_import, unicode_literals
    from .celery import app #用到他的装饰器
    
    @app.task
    def cmd(comm):
        print("running cmd...")
        return comm
    
    @app.task
    def file_transfer(filename):
        print("send file")
    tasks2.py

    命令执行:启动Celery Worker和python解释器都是在项目同级下进行的

    celery -A celProject worker -l info -P eventlet

    >>> from celProject import tasks,tasks2
    >>> t2 = tasks2.cmd.delay('ff')
    >>> t2.get()
    'ff'
    >>> t2 = tasks.xsum.delay([1,3,5,44])
    >>> t2.get()
    53

     四:设置定时器(定时执行任务)流程图中celery beat(任务调度器)

    我们另起一个进程,执行任务调度器

    celery -A 项目.具体任务 beat  #启动任务调度器

    任务调度器,内部相当于,一直循环计时,每当时间一到,将放置一个任务到broker中间件中,任务就可以由worker执行

    接着在上面的项目目录下创建一个任务:ontime_task.py

    celProject  目录
    ---celery.py  #这个文件必须这样命名
    ---tasks.py
    ---tasks2.py
    ---ontime_task.py    #任务调度器
    from __future__ import absolute_import, unicode_literals
    from celery.schedules import crontab
    from .celery import app
    
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # Calls test('hello') every 10 seconds.
        sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')  #.s就是相当于delay,发送到worker执行
    
        # Calls test('world') every 30 seconds
        sender.add_periodic_task(30.0, test.s('world'), expires=10)  #默认是秒单位
    
        # Executes every Monday morning at 7:30 a.m.
        sender.add_periodic_task(
            crontab(hour=7, minute=30, day_of_week=1),  #可以通过crontab设置其他格式时间
            test.s('Happy Mondays!'),
        )
    
    
    @app.task
    def test(arg):  #定时执行的任务
        print(arg)

    正常启动Celery Worker,然后开启任务调度器

    celery -A celProject worker -l info -P eventlet  #开启一个worker,一会任务调度器中任务由他执行
    celery -A celProject.ontime_task beat -l debug  #开启任务调度器,由于我们是同项目中app,所以我们需要在项目同级下启动该任务

    测试效果:每隔10秒一个hello,30秒一个world...

    补充:上面是函数形式执行的定时任务,下面使用配置文件模式

    修改ontime_task.py

    from __future__ import absolute_import, unicode_literals
    from celery.schedules import crontab
    from .celery import app
    
    app.conf.beat_schedule = {
        'add-every-monday-morning': {
            'task': 'celProject.tasks.add',    #这里的任务名,我们需要和启动的worker显示的任务列表中任务一致
            'schedule': crontab(hour=7, minute=30, day_of_week=1),
            'args': (16, 16),
        },
        'add-every-5-seconds': {
            'task': 'celProject.tasks.add',  # 这里的任务名,我们需要和启动的worker显示的任务列表中任务一致
            'schedule': 5,
            'args': (13, 26),
        },
        'add-every-10-seconds': {
            'task': 'celProject.tasks.mul',  # 这里的任务名,我们需要和启动的worker显示的任务列表中任务一致
            'schedule': 10,
            'args': (13, 26),
        },
    }
    
    app.conf.timezone = "UTC"
    
    @app.task
    def test(arg):
        print(arg)

    重新启动任务调度器,测试结果

     五:与Django联合使用

    (一)在Django项目中创建任务

    1.在项目配置文件同级下设置celery.py文件,功能和上面一样,负责收集任务,进行调度

    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CRM.settings')  #需要和项目名一致
    
    app = Celery('CRM')  #这个是设置worker名称,随便写,与项目一致吧
    
    # Using a string here means the worker don't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()
    
    
    @app.task(bind=True)
    def debug_task(self):  #测试任务,可以不要
        print('Request: {0!r}'.format(self.request))

    2.在各个所需要定时任务的APP下添加tasks.py文件

    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    
    @shared_task
    def add(x, y):
        return x + y
    
    
    @shared_task
    def mul(x, y):
        return x * y
    
    
    @shared_task
    def xsum(numbers):
        return sum(numbers)
    sale下的tasks.py
    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    
    @shared_task
    def cmd(comm):
        print("running cmd...")
        return comm
    
    @shared_task
    def file_transfer(filename):
        print("send file")
    student下的tasks.py

    任务创建完毕。

    3.可以先在项目下开启worker

    celery -A CRM worker -l info -P eventlet  #是在settings.py文件的上级目录的同级去执行的启动命令,因为celery放在这个目录下面

    (二)开始配置Django

    1. 由于需要使用到django-celery-beat模块,所以需要先安装

    pip install django-celery-beat

    2.添加django-celery-beat模块到INSTALLED_APPS中,便于操作数据库

        INSTALLED_APPS = (
            ...,
            'django_celery_beat',
        )

    3.配置数据库

    python manage.py migrate

    4.启动项目,在管理员页面生成任务

    (1)查看表

    (2)设置周期:固定时间和间隔时间两种

     (3)创建任务

     

    5.启动任务调度器,会去数据库中获取我们设置的任务,按照我们设置的时间间隔执行任务

    celery -A CRM beat -l info -S django  #也是需要在settings的上级目录下执行

    (三)结果查看

     

  • 相关阅读:
    面向对象概述(课堂笔记)
    final
    static方法
    Ubuntu中Qt5.7.0无法输入中文
    Ubuntu中Qt+opencv图像显示
    Ubuntu中Qt新建窗体提示lGL错误
    Ubuntu中Qt5.7.0的安装及opencv2.4.13配置
    Ubuntu16.04删除客人会话
    ffmpeg的安装--opencv视频处理必备
    CentOS+OpenCV图像的读入、显示
  • 原文地址:https://www.cnblogs.com/ssyfj/p/9247254.html
Copyright © 2011-2022 走看看