zoukankan      html  css  js  c++  java
  • Python Celery队列

    Celery队列简介:

    Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery.

    使用场景:

    1.你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。

    2.你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福

    Celery原理:

    Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis 或者是数据库来存放消息的中间结果

    Celery优点:

    1. 简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的
    2. 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
    3. 快速:一个单进程的celery每分钟可处理上百万个任务
    4. 灵活: 几乎celery的各个组件都可以被扩展及自定制

    Celery缺点:

        1.目前只能在Linux系统上有较好的支持

    Celery工作流程图:

      

      在传统的web应用中,Django的web页面通过url的映射到view,view再执行方法,如果方法需要调用大量的脚本,执行大量的任务,页面就会阻塞,如果在项目中使用Celery队列.首先用户的任务会被celery放到broker中进行中转,然后将任务分为一个个的task来执行,由于celery是异步机制,所以会直接给用户返回task_id,页面拿到task_id就可以执行后续的操作,比如查看任务进度,暂停任务,而无需等待所有任务全部执行完毕,才能看到页面

    Celery的安装与使用

    1.安装:

      1.在linux(ubuntu)系统上首先安装Celery队列

        pip3 install Celery

           2.在linux安装redis

        sudo apt-get install redis-server

           3.在linux上安装redis-celery中间件

        pip3 install -U "celery[redis]"

           4.启动redis

                 sudo /etc/init.d/redis-server start

    2.创建并执行一个简单的task

       命名为tasks.py

     1 from celery import Celery
     2  
     3 app = Celery('tasks',
     4              broker='redis://localhost',
     5              backend='redis://localhost')
     6  
     7 @app.task
     8 def add(x,y):
     9     print("running...",x,y)
    10     return x+y
    View Code

      启动监听并开始执行该服务

     celery -A tasks worker -l debug

      在开启一个终端进行测试任务

        进入python环境

    1 from tasks import add
    2 t = add.delay(3,3) #此时worker会生成一个任务和任务id
    3 t.get() #获取任务执行的结果
    4 t.get(propagate=False) #如果任务执行中出现异常,在client端不会异常退出
    5 t.ready()#查看任务是否执行完毕
    6 t.traceback #打印异常详细信息
    View Code

    3.在项目中创建celery

    在当前的目录下创建文件夹celery_pro

     mkdir celery_pro

    在此目录下创建两个文件

    目录结构:

    1 celery_proj
    2     /__init__.py
    3     /celery.py
    4     /tasks.py
    View Code

     celery.py(定义了celery的一些元信息)

     1 rom __future__ import absolute_import, unicode_literals
     2 from celery import Celery
     3  
     4 app = Celery('proj',
     5              broker='redis://localhost',   #消息中间接收
     6              backend='redis://localhost', #消息结果存放 
     7              include=['proj.tasks'])          #执行任务的文件   
     8  
     9 # Optional configuration, see the application user guide.
    10 app.conf.update(
    11     result_expires=3600,
    12 )
    13  
    14 if __name__ == '__main__':
    15     app.start()
    View Code

    tasks.py (定义任务执行的具体逻辑和调用的具体方法)

     1 from __future__ import absolute_import, unicode_literals
     2 from .celery import app
     3 
     4 
     5 @app.task
     6 def add(x, y):
     7     return x + y
     8 
     9 
    10 @app.task
    11 def mul(x, y):
    12     return x * y
    13 
    14 
    15 @app.task
    16 def xsum(numbers):
    17     return sum(numbers)
    View Code

    启动worker

     celery -A celery_pro worker -l debug

    再另一个窗口打开python命令模式进行测试

    1 from celery_pro import tasks
    2 
    3 t = tasks.add.delay(3,4)
    4 t.get()
    View Code

     Celery的分布式:多启动worker就可以自动实现负载均衡,无需手动管理

     Celery永驻后台(开启&重启&关闭)

    1 celery multi start w1 -A celery_pro -l info  #开启后台celery任务
    2 celery  multi restart w1 -A proj -l info #重启该服务
    3 celery multi stop w1 -A proj -l info #关闭该服务
    View Code

     Celery定时任务

    在celery_pro文件夹下创建periodic_tasks.py

    目录结构:

    1  celery_proj
    2      /__init__.py
    3      /celery.py
    4      /tasks.py
    5      /periodic_tasks.py
    View Code

    文件内容如下:

     1 from __future__ import absolute_import, unicode_literals
     2 from .celery import app
     3 from celery.schedules import crontab
     4 
     5 
     6 @app.on_after_configure.connect
     7 def setup_periodic_tasks(sender, **kwargs):
     8     # Calls test('hello') every 10 seconds.
     9     sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
    10 
    11     # Calls test('world') every 30 seconds
    12     sender.add_periodic_task(30.0, test.s('world'), expires=10)
    13 
    14     # Executes every Monday morning at 7:30 a.m.
    15     sender.add_periodic_task(
    16         crontab(hour=21, minute=42, day_of_week=5),
    17         test.s('Happy Mondays!'),
    18     )
    19 
    20 @app.task
    21 def test(arg):
    22     print(arg)
    View Code

    修改celery.py,加入periodic_task.py

     1 from __future__ import absolute_import, unicode_literals
     2 from celery import Celery
     3 
     4 app = Celery('proj',
     5              broker='redis://localhost',
     6              backend='redis://localhost',
     7              include=['celery_pro.tasks','celery_pro.periodic_tasks'])
     8 
     9 # Optional configuration, see the application user guide.
    10 app.conf.update(
    11     result_expires=3600,
    12 )
    13 
    14 if __name__ == '__main__':
    15     app.start()
    16 ~                                                                                       
    17 ~                      
    View Code

    在服务端启动 celery -A celery_pro worker -l debug

    在客户端启动 celery -A celery_pro.periodic_tasks beat -l debug

    在服务端如果看到打印的hell ,world说明定时任务配置成功

    上面是通过调用函数添加定时任务,也可以像写配置文件 一样的形式添加, 下面是每30s执行的任务

    在celery.py中添加

    1 app.conf.beat_schedule = {
    2     'add-every-30-seconds': {
    3         'task': 'cerely_pro.tasks.add', #执行的具体方法
    4         'schedule': 5.5,  #每秒钟执行
    5         'args': (16, 16)   #执行的具体动作的参数
    6     },
    7 }
    8 app.conf.timezone = 'UTC'
    View Code

    更多定制

    上面的定时任务比较简单,但如果你想要每周一三五的早上8点给你发邮件怎么办呢?用crontab功能,跟linux自带的crontab功能是一样的,可以个性化定制任务执行时间

    1 rom celery.schedules import crontab
    2  
    3 app.conf.beat_schedule = {
    4     #在每周一早上7:30执行
    5     'add-every-monday-morning': {
    6         'task': 'celery_pro.tasks.add',
    7         'schedule': crontab(hour=7, minute=30, day_of_week=1),
    8         'args': (16, 16),
    9     },
    View Code

    还有更多定时配置方式如下:

    Example Meaning
    crontab() Execute every minute.
    crontab(minute=0, hour=0) Execute daily at midnight.
    crontab(minute=0, hour='*/3') Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
    crontab(minute=0,
    hour='0,3,6,9,12,15,18,21')
    Same as previous.
    crontab(minute='*/15') Execute every 15 minutes.
    crontab(day_of_week='sunday') Execute every minute (!) at Sundays.
    crontab(minute='*',
    hour='*',day_of_week='sun')
    Same as previous.
    crontab(minute='*/10',
    hour='3,17,22',day_of_week='thu,fri')
    Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays.
    crontab(minute=0,hour='*/2,*/3') Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
    crontab(minute=0, hour='*/5') Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5).
    crontab(minute=0, hour='*/3,8-17') Execute every hour divisible by 3, and every hour during office hours (8am-5pm).
    crontab(0, 0,day_of_month='2') Execute on the second day of every month.
    crontab(0, 0,
    day_of_month='2-30/3')
    Execute on every even numbered day.
    crontab(0, 0,
    day_of_month='1-7,15-21')
    Execute on the first and third weeks of the month.
    crontab(0, 0,day_of_month='11',
    month_of_year='5')
    Execute on the eleventh of May every year.
    crontab(0, 0,
    month_of_year='*/3')
    Execute on the first month of every quarter.

     Celery+Django实现异步任务分发

    1.在setting.py的文件同一级别创建celery.py

     1 from __future__ import absolute_import, unicode_literals
     2 import os
     3 from celery import Celery
     4 
     5 # 设置Django的环境变量
     6 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'PerfectCRM.settings')
     7 
     8 #设置app的默认处理方式,如果不设置默认是rabbitMQ
     9 app = Celery('proj',
    10              broker='redis://localhost',
    11              backend='redis://localhost'
    12 )
    13 
    14 #配置前缀
    15 app.config_from_object('django.conf:settings', namespace='CELERY')
    16 
    17 #自动扫描app下的tasks文件
    18 app.autodiscover_tasks()
    19 
    20 
    21 @app.task(bind=True)
    22 def debug_task(self):
    23     print('Request: {0!r}'.format(self.request))
    24                                             
    View Code

    2.修改当前目录下的__init__文件

    1 from __future__ import absolute_import, unicode_literals
    2  
    3 #启动时检测celery文件
    4 from .celery import app as celery_app
    5  
    6 __all__ = ['celery_app']
    View Code

    3.在app下新增tasks文件,写要执行的任务

     1 from __future__ import absolute_import, unicode_literals
     2 from celery import shared_task
     3 
     4 
     5 @shared_task
     6 def add(x, y):
     7     return x + y
     8 
     9 
    10 @shared_task
    11 def mul(x, y):
    12     return x * y
    13 
    14 
    15 @shared_task
    16 def xsum(numbers):
    17     return sum(numbers)
    18                             
    View Code

    在另一个app下新增tasks文件

    1 from __future__ import absolute_import, unicode_literals
    2 from celery import shared_task
    3 import time,random
    4 
    5 @shared_task
    6 def randnum(start, end):
    7     time.sleep(3)
    8     return random.ranint(start,end)
    View Code

    在app下的urls.py文件中增加映射

    1 url(r'celery_call', views.celery_call),
    2 url(r'celery_result', views.celery_result),
    View Code

    在views下增加处理逻辑

     1 from crm import tasks
     2 from celery.result import AsyncResult
     3 import random
     4 #计算结果
     5 def celery_call(request):
     6     randnum =random.randint(0,1000)
     7     t = tasks.add.delay(randnum,6)
     8     print('randum',randnum)
     9     return HttpResponse(t.id)
    10 
    11 #获取结果
    12 def celery_result(request):
    13     task_id = request.GET.get('id')
    14     res = AsyncResult(id=task_id)
    15     if res.ready():
    16         return HttpResponse(res.get())
    17     else:
    18         return HttpResponse(res.ready())
    View Code

    测试

    首先启动Django,从web端输入url调用celery_call方法

    例:http://192.168.17.133:9000/crm/celery_call,此方法会返回一个task_id(41177118-3647-4830-b8c8-7be76d9819d7)

    带着这个task_id 访问http://192.168.17.133:9000/crm/celery_result?id=41177118-3647-4830-b8c8-7be76d9819d7如果可以看到结果说明配置成功

     Dnango+Celery实现定时任务

     1.安装Django,Celery中间件

       pip3 install django-celery-beat

    2.在Django的settings文件中,新增app,名称如下

        INSTALLED_APPS = (

      .....,

      'django_celery_beat', #新增的app

    )

    3.输入命令

    python manage.py migrate #创建与Django有关定时计划任务的新表

    4.通过celery beat开启定时任务

    celery -A PrefectCRM beat -l info -S django

    5.启动Django服务,进入admin配置页面

    python3 manager.py runserver 0.0.0.0:9000

    并设置settings.py中的

    ALLOW_HOSTS=['*']

    6.可以在原有业务表的基础之上看到新的三张表

     

    最后配置计划任务表,在此表中将定时任务和执行的频率相关联

    后记:经测试,每添加或修改一个任务,celery beat都需要重启一次,要不然新的配置不会被celery beat进程读到

     

     


    'django_celery_beat
  • 相关阅读:
    20191024-6 Alpha发布用户使用报告
    【第三章】MySQL数据库的字段约束:数据完整性、主键、外键、非空、默认值、自增、唯一性
    【第六章】MySQL日志文件管理
    【第四章】MySQL数据库的基本操作:数据库、表的创建插入查看
    【第一章】MySQL数据概述
    【Linux运维】LNMP环境配置
    【Linux 运维】linux系统修改主机名
    【Linux 运维】linux系统查看版本信息
    【Linux 运维】Centos7初始化网络配置
    50、树中两个节点的公共祖先
  • 原文地址:https://www.cnblogs.com/luhuajun/p/7488363.html
Copyright © 2011-2022 走看看