zoukankan      html  css  js  c++  java
  • Celery

    1 Celery介绍

      1、celery应用举例

          1、Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,
              如果你的业务场景中需要用到异步任务,就可以考虑使用celery

          2、你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,
            你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情

          3、Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis

      2、Celery有以下优点

          1、简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的

          2、高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务

          3、快速:一个单进程的celery每分钟可处理上百万个任务

          4、灵活: 几乎celery的各个组件都可以被扩展及自定制

      3、Celery基本工作流程图

          

        user:用户程序,用于告知celery去执行一个任务。
        broker: 存放任务(依赖RabbitMQ或Redis,进行存储)
        worker:执行任务

    2 celery简单使用

       1、安装

          1.  安装celery pip3 install celery             # ln -s /usr/local/python3/bin/celery /bin/celery

          2.  安装redis

       2、创建tasks.py文件进行验证

    from celery import Celery
    import time
    
    app = Celery('TASK',
                 broker='redis://localhost',        
                 backend='redis://localhost')
    
    @app.task
    def add(x, y):
       print("running..add.", x, y)
       return x + y
    
    @app.task
    def minus(x, y):
       time.sleep(60)
       print("running..minus.", x, y)
       return x - y

       1、启动Celery Worker来开始监听并执行任务

            celery -A tasks worker --loglevel=info            # taskstasks.py文件:必须在tasks.py所在目录下执行

        2、调用任务:再打开两个终端,进行命令行模式,调用任务

             >>> import tasks

            >>> import tasks

            >>> t2 = tasks.minus.delay(9,11)

            #然后在另一个终端重复上面步骤执行

            >>> t1 = tasks.add.delay(3,4)

            >>> t1.get()                                                   #由于t2执行sleep3s所以t1.get()需要等待

      3、celery其他命令

          >>> t.ready()                                                  #返回true证明可以执行,不必等待

          >>> t.get(timeout=1)                                      #如果1秒不返回结果就超时,避免一直等待

          >>> t.get(propagate=False)                          #如果执行的代码错误只会打印错误信息

          >>> t.traceback                                             #打印异常详细结果

    3 在项目中如何使用celery

      1、创建目录celery_pro,并在celery_pro下创建下面两个文件

        1)celery.py

    from __future__ import absolute_import, unicode_literals
    #1. absolute_import 可以使导入的celery是python绝对路基的celery模块,不是当前我们创建的celery.py
    #2. unicode_literals 模块可能是python2和3兼容的,不知道
    from celery import Celery
    # from .celery import Celery        #这样才是导入当前目录下的celery
    
    app = Celery('proj',
                 broker='redis://localhost',
                 backend='redis://localhost',
                 include=['celery_pro.tasks',
                          'celery_pro.tasks2',
                          ])
    #celery-pro是存放celery文件的文件夹名字
    
    #实例化时可以添加下面这个属性
    app.conf.update(
       result_expires=3600,        #执行结果放到redis里,一个小时没人取就丢弃
    )
    
    # 配置定时任务:每5秒钟执行 调用一次celery_pro下tasks.py文件中的add函数
    app.conf.beat_schedule = {
        'add-every-5-seconds': {
            'task': 'celery_pro.tasks.add',
            'schedule': 5.0,
            'args': (16, 16)
        },
    }
    app.conf.timezone = 'UTC'
    
    if __name__ == '__main__':
       app.start()
    

        2)tasks.py

    from __future__ import absolute_import, unicode_literals
    from .celery import app       #从当前目录导入app
    
    #写一个add函数
    @app.task
    def add(x, y):
        return x + y
    

        3)tasks2.py

    from __future__ import absolute_import, unicode_literals
    from .celery import app
    import time,random
    
    @app.task
    def randnum(start,end):
        time.sleep(3)
        return random.randint(start,end)

    touch __init__.py                     # 在celery_pro目录下新建__init__.py文件,否则执行命令时会报错

      2、执行下面两条命令即可让celery定时执行任务了 

        1、 启动一个worker:在celery_pro外层目录下执行

            celery -A celery_pro worker -l info

        2、 启动任务调度器 celery beat

            celery -A celery_pro beat -l info

        3、执行效果

            看到celery运行日志中每5秒回返回一次 add函数执行结果    

      3、启动celery的worker:每台机器可以启动8个worker

                      1pythondir目录下启动 /pythondir/celery_pro/ 目录下的worker

                              celery -A celery_pro worker -l info

                      2、后台启动worker/pythondir/celery_pro/目录下执行

                              celery multi start w1 -A celery_pro -l info             #在后台启动w1这个worker

                              celery multi start w1 w2 -A celery_pro -l info       #一次性启动w1,w2两个worker

                              celery -A celery_pro status                                       #查看当前有哪些worker在运行

                              celery multi stop w1 w2 -A celery_pro                   #停止w1,w2两个worker

                              celery multi restart w1 w2 -A celery_pro               #重启w1,w2两个worker

    4 celery与Django项目最佳实践

    pip3 install Django==2.0.4
    pip3 install celery==4.3.0
    pip3 install redis==3.2.1
    pip3 install ipython==7.6.1 
    
    find ./ -type f | xargs sed -i 's/
    $//g'     # 批量将当前文件夹下所有文件装换成unix格式
    celery  multi start celery_test -A celery_test -l debug --autoscale=50,5        # celery并发数:最多50个,最少5个
    http://docs.celeryproject.org/en/latest/reference/celery.bin.worker.html#cmdoption-celery-worker-autoscale
    ps auxww|grep "celery worker"|grep -v grep|awk '{print $2}'|xargs kill -9       # 关闭所有celery进程

      1、Django中使用celery介绍(celery无法再windows下运行)

          1)在Django中使用celery时,celery文件必须以tasks.py

          2)Django会自动到每个APP中找tasks.py文件

      2、 创建一个Django项目celery_test,和app01

      3、在与项目同名的目录下创建celery.py 

    from __future__ import absolute_import
    import os
    from celery import Celery
    
    # 只要是想在自己的脚本中访问Django的数据库等文件就必须配置Django的环境变量
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_test.settings')
    
    # app名字
    app = Celery('celery_test')
    
    # 配置celery
    class Config:
        BROKER_URL = 'redis://192.168.56.11:6379'
        CELERY_RESULT_BACKEND = 'redis://192.168.56.11:6379'
    
    app.config_from_object(Config)
    # 到各个APP里自动发现tasks.py文件
    app.autodiscover_tasks()
    

        4、在与项目同名的目录下的 __init__.py 文件中添加下面内容

    from __future__ import absolute_import, unicode_literals
    
    # 告诉Django在启动时别忘了检测我的celery文件
    from .celery import app as celery_ap
    __all__ = ['celery_app']
    

        5、创建app01/tasks.py文件

    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    
    # 这里不再使用@app.task,而是用@shared_task,是指定可以在其他APP中也可以调用这个任务
    @shared_task
    def add(x, y):
       return x + y
    

        6、在setings.py文件指定redis服务器的配置

    CELERY_BROKER_URL = 'redis://localhost'
    CELERY_RESULT_BACKEND = 'redis://localhost'
    

      

      7、celery_test这个Django项目拷贝到centos7.3django_test文件夹中

      8、保证启动了redis-server

      9、 启动一个celeryworker

          celery -A celery_test worker -l info

      10、Linux中启动 Django项目

          python3 manage.py runserver 0.0.0.0:9000

      11、访问http://1.1.1.3:9000/celery_call/ 获取任务id

           

      12、根据11中的任务id获取对应的值

          http://1.1.1.3:9000/celery_result/?id=5065b65b-0c01-430a-a67f-9531fe3e8d90

    5 基于步骤1.4:在django中使用计划任务功能

      1、Django中使用celery的定时任务需要安装django-celery-beat

          pip3 install django-celery-beat

      2、 在Django的settings中注册django_celery_beat

      INSTALLED_APPS = (
            ...,
            'django_celery_beat',
        )

      3、执行创建表命令

          python3 manage.py makemigrations

          python3 manage.py migrate

          python3 manage.py startsuperuser

      4、运行Django项目

          celery -A celery_test worker -l info

          python3 manage.py runserver 0.0.0.0:9000

      5、登录 http://1.1.1.3:9000/admin/ 可以看到多了三张表

           

      6、在intervals表中添加一条每5秒钟执行一次的任务的时钟

           

      7、在Periodic tasks表中创建任务

           

      8、在/django_test/celery_test/目录下执行下面命令

          celery -A celery_test worker -l info                                                   #启动一个worker

          python manage.py runserver 0.0.0.0:9000                           #运行Django项目

          celery -A celery_test beat -l info -S django                                                   #启动心跳任务

          说明:

            运行上面命令后就可以看到在运行celery -A celery_test worker -l info         的窗口中每5秒钟执行一次app01.tasks.add: 2+3=5

      9、关于添加新任务必须重启心跳问题

          1、 每次在Django表中添加一个任务就必须重启一下beat

          2、 但是Django中有一个djcelery插件可以帮助我们不必重启

  • 相关阅读:
    软件测试人员的年终绩效考核怎么应对
    收藏
    顶踩组件 前后两版
    订阅组件
    hdu 1963 Investment 完全背包
    hdu 4939 Stupid Tower Defense 动态规划
    hdu 4405 Aeroplane chess 动态规划
    cf 414B Mashmokh and ACM 动态规划
    BUPT 202 Chocolate Machine 动态规划
    hdu 3853 LOOPS 动态规划
  • 原文地址:https://www.cnblogs.com/J-xiaowei/p/12681790.html
Copyright © 2011-2022 走看看