zoukankan      html  css  js  c++  java
  • Celery分布式任务队列快速入门

    本节内容

    1. Celery介绍和基本使用

    2. 项目中使用Celery

    3. Celery定时任务

    4. Celery与Django结合

    5. Django中使用计划任务

     

    一  Celery介绍和基本使用

     

    需求场景

    1.  对100台命令执行一条批量命令,命令执行需要很长时间,但是不想让主程序等着结果返回,而是给主程序返回一个任务ID,task_id

    主程序过一段时间根据task_id,获取执行结果即可,再命令执行期间,主程序 可以继续做其他事情

    2.  定时任务,比如每天检测一下所有的客户资料,发现是客户的生日,发个祝福短信

     

    解决方案

    1.  逻辑view 中启一个进程

    父进程结束,子进程跟着结束,子进程任务没有完成,不符合需求

    父进程结束,等着子进程结束,父进程需等着结果返回,不符合需求

    小结:该方案解决不了阻塞问题,即需要等待 

    2. 启动 subprocess,任务托管给操作系统执行

    实现task_id,实现异步,解决阻塞

    小结:大批量高并发,主服务器会出现问题,解决不了并发

    3. celery

    celery提供多子节点,解决并发问题

     

    celery介绍

    celery是一个基于python开发的分布式异步消息队列,轻松实现任务的异步处理

    celery在执行任务时需要一个消息中间件来接收和发送任务消息,以及存储任务结果,一般使用RabbitMQ 或 Redis

     

    celery优点

    简单:熟悉celery的工作流程后,配置使用简单

    高可用:当任务执行失败或执行过程中发生连接中断,celery会自动尝试重新执行任务

    快速:一个单进程的celery每分钟可处理上百万个任务

    灵活:几乎celery的各个组件都可以被扩展及自定制

     

    celery基本工作流程

    其中中间队列用于分配任务以及存储执行结果

     

    celery安装及使用

    1.  安装python模块

    pip3 install celery
    pip3 install redis

    2.  安装redis服务

    wget  http://download.redis.io/releases/redis-3.2.8.tar.gz
    tar -zxvf redis-3.2.8.tar.gz
    cd redis-3.2.8
    make

    src/redis-server # 启动redis 服务

    3.  创建一个celery application 用来定义任务列表

    创建一个任务 tasks.py

    from celery import Celery
     
    app = Celery('TASK',
                 broker='redis://localhost',
                 backend='redis://localhost')
     
    @app.task
    def add(x,y):
        print("running...",x,y)
        return x+y

     4.  启动celery worker 来开始监听并执行任务

    celery -A tasks worker --loglevel=info

    tasks 任务文件名,worker 任务角色,--loglevel=info 任务日志级别

    5.  调用任务

    打开另外终端,进入命令行模式,调用任务

    6.  celery常用接口

    • tasks.add(4,6) ---> 本地执行

    • tasks.add.delay(3,4) --> worker执行

    • t=tasks.add.delay(3,4)  --> t.get()  获取结果,或卡住,阻塞

    • t.ready()---> False:未执行完,True:已执行完

    • t.get(propagate=False) 抛出简单异常,但程序不会停止

    • t.traceback 追踪完整异常

     

    补充:如何使用第三方工具

    1. 导入第三方包,如 from celery import Celery

    2. 实例化第三方类,如 app = Celery(......)

    3. 实例化的对象去关联执行任务的方法,如 @app.task

    4. 分区角色  worker 执行任务,broker分配任务

     

    二  项目中使用Celery

    1.  项目目录结构

    project
        |-- __init__.py
        |-- celery.py   # 配置文档
        |-- tasks.py    # 任务函数
        |-- tasks2.py   # 任务函数

    2.  项目文件

    project/celery.py

    # from celery import Celery 默认当前路径,更改为绝对路径(当前路径有个celery.py文件啦)
    from __future__ import absolute_import, unicode_literals
    from celery import Celery
    
    app = Celery('project',
                 broker='redis://localhost',
                 backend='redis://localhost',
                 include=['project.tasks','project.tasks2'])  # 配置文件和任务文件分开了,可以写多个任务文件
    
    # app 扩展配置
    app.conf.update(
        result_expires=3600,
    )
    
    if __name__ == '__main__':
        app.start()

    celery.py作用相当于配置文件

    project/tasks.py

    from __future__ import absolute_import, unicode_literals
    from .celery import app
    
    @app.task
    def add(x, y):
        return x + y
    
    @app.task
    def mul(x, y):
        return x * y

    project/tasks.py

    from __future__ import absolute_import, unicode_literals
    from .celery import app
    
    @app.task
    def hello():
        return 'Hello World'

    3.  启动项目worker

    celery -A project worker -l info

    其中 project 为项目名

    另启终端,与project同目录进入python3

    4.  实现分布式 
    当启动多个时 celery -A project worker -l info,去broker去相应任务,实现分布式
     
     
    5.  后台启动woker
    celery multi start w1 -A project -l info
    celery multi start w2 -A project -l info
    celery multi start w3 -A project -l info
     
    celery multi restart w1 -A project -l info
    celery multi stop w1 w2 w3        # 任务立刻停止
    celery multi stopwait w1 w2 w3    # 任务执行完,停止
     
    三  Celery定时任务
     
     
     
    celery支持定时任务,设定好任务的执行时间,celery就会定时帮你执行,这个定时任务模块叫 celery beat
     
    项目目录结构
    project
        |-- __init__.py
        |-- celery.py          # 配置文件
        |-- periodic_task.py   # 定时任务文件

    脚本celery.py

    from __future__ import absolute_import, unicode_literals
    from celery import Celery
    
    app = Celery('project',
                 broker='redis://localhost',
                 backend='redis://localhost',
                 include=['project.periodic_task',])
    
    app.conf.update(
        result_expires=3600,
    )
    
    if __name__ == '__main__':
        app.start()

    脚本periodic_task.py

    from __future__ import absolute_import, unicode_literals
    from .celery import app
    from celery.schedules import crontab
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # 每10s调用 test('hello')
        sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
    
        # 每20s调用 test('world')
        sender.add_periodic_task(20.0, test.s('world'), expires=10)
    
        # 每周一早上7:30 执行 test('Happy Mondays!')
        sender.add_periodic_task(
            crontab(hour=7, minute=30, day_of_week=1), # 可灵活修改
            test.s('Happy Mondays!'),
        )
    
    @app.task
    def test(arg):
        print(arg)
     
    启动角色 worker  执行任务
    celery -A project worker -l info
    启动角色 beat 将定时任务放到队列中
    celery -A  project.periodic_task  beat  -l  debug

    也可以在配置文件celery.py 里添加定时任务

    app.conf.beat_schedule = {
        'add-every-30-seconds': {
            'task': 'project.tasks.add',
            'schedule': 30.0,
            'args': (16, 16)
        },
    }
    app.conf.timezone = 'UTC'

    每周1的早上7.30执行project.tasks.add任务

     

    还有更多定时配置方式如下:

    Example Meaning
    crontab() 每分钟执行  
    crontab(minute=0,hour=0) 每天0点执行
    crontab(minute=0,hour='*/3') 每3小时执行: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
    crontab(minute=0,hour='0,3,6,9,12,15,18,21')
    同上
    crontab(minute='*/15') 每15分钟执行
    crontab(day_of_week='sunday') 周天的每分钟执行
    crontab(minute='*',hour='*',day_of_week='sun')
    同上
    crontab(minute='*/10',hour='3,17,22',day_of_week='thu,fri')

    周三、五,3-4 am, 5-6 pm, and 10-11 pm,每10分钟执行

    crontab(minute=0,hour='*/2,*/3') 每小时/2和每小时/3,执行
    crontab(minute=0, hour='*/5') 每小时/5,执行
    crontab(minute=0, hour='*/3,8-17') 每小时/3,8am-5pm,执行
    crontab(0,0,day_of_month='2') Execute on the second day of every month.
    crontab(0,0,day_of_month='2-30/3')
    Execute on every even numbered day.
    crontab(0,0,day_of_month='1-7,15-21')
    Execute on the first and third weeks of the month.
    crontab(0,0,day_of_month='11',month_of_year='5')
    Execute on the eleventh of May every year.
    crontab(0,0,month_of_year='*/3')
    Execute on the first month of every quarter.
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
    四  Celery与Django结合
     
     
     
    1.  项目目录结构
    LearnCelery
       |-- app1
            |-- tasks.py
            |-- models.py
       |-- app2
            |-- tasks.py
            |-- models.py
       |-- LearnCelery
            |-- __init__.py
            |-- celery.py
            |-- settings.py

    2.  脚本代码

    LearnCelery/app/tasks.py   # 必须叫这个名字

    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    import time
    
    # 所有的app都可以调用
    @shared_task
    def add(x, y):
        time.sleep(10)
        return x + y
    
    @shared_task
    def mul(x, y):
        time.sleep(10)
        return x * y

    LearnCelery/LearnCelery/__init__.py

    from __future__ import absolute_import, unicode_literals
    
    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app
    
    __all__ = ['celery_app']

    LearnCelery/LearnCelery/celery.py

    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery
    
    # 单独脚本调用Django内容时,需配置脚本的环境变量
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')
    
    app = Celery('mysite')
    
    #  CELERY_ 作为前缀,在settings中写配置
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # 到Django各个app下,自动发现tasks.py 任务脚本
    app.autodiscover_tasks()
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))

    LearnCelery/LearnCelery/settings.py

    # For celery
    CELERY_BROKER_URL = 'redis://localhost'
    CELERY_RESULT_BACKEND = 'redis://localhost'

    3.  启动celery

    celery -A LearnCelery worker -l debug
    4.  urls.py 视图处理
    urlpatterns = [
        url(r'^celery_call/$', views.celery_call),
        url(r'^celery_res/$', views.celery_res),
    ]
    五  Django中使用计划任务
     
     
    1.  安装插件
    pip3 install django-celery-beat
    2.  修改配置 settings.py
    INSTALLED_APPS = [
        ....   
        'django_celery_beat',
    ]

    3. 数据库迁移

    python manage.py migrate

    4.  启动 celery beat

    celery -A LearnCelery beat -l info -S django

    定时任务存到数据库里,启动beat定时取任务放到队列里执行

    5.  admin管理

     

     

    启动celery beat和worker,会发现每隔2秒,beat会发起一个任务消息让worker执行tasks任务

    注意,经测试,每添加或修改一个任务,celery beat都需要重启一次,要不然新的配置不会被celery beat进程读到

     
    更多信息请参考
    http://www.cnblogs.com/alex3714/articles/6351797.html
     
    代码示例请参考
    https://github.com/Jonathan1314/LearnCelery 
     
  • 相关阅读:
    scikit-opt——DE(差分进化)
    scikit-opt——SA(模拟退火)
    scikit-opt——Python中的群体智能优化算法库
    springboot EnableAutoConfiguration
    JAVA线程sleep和wait方法区别
    数据库优化-水平拆分 垂直拆分
    component和bean区别
    mysql 索引
    maven资源文件的相关配置
    IntelliJ IDEA 2017.2.2 的破解 有效期 2116年
  • 原文地址:https://www.cnblogs.com/jonathan1314/p/7649249.html
Copyright © 2011-2022 走看看