zoukankan      html  css  js  c++  java
  • celery

    1.什么是celery(任务队列)?

    任务队列是一种在线程或机器间分发任务的机制。

    消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。

    Celery 用消息通信,通常使用中间人(Broker)在客户端和职程间斡旋。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程。

    Celery 系统可包含多个职程和中间人,以此获得高可用性和横向扩展能力。

    Celery 是用 Python 编写的,但协议可以用任何语言实现。

    在python中定义Celery的时候,我们要引入Broker,中文翻译过来就是"中间人/经纪人"的意思,在这里Broker起到一个中间人的角色,在工头提出任务的时候,把所有的任务放到Broker里面,在Broker的另一头,一群码农等着取出一个个任务准备着手做.

    这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的,所以我们要引入Backend来保存每次任务的结果。这个Backend有点像我们的Broker,也是存储信息用的,只不过这里存的是那些任务的返回结果。我们可以选择只让错误执行的任务返回结果到Backend,这样我们取回结果,便可以知道有多少任务执行失败了。

    Celery 介绍

    在Celery中几个基本的概念,需要先了解下,不然不知道为什么要安装下面的东西。概念:Broker,Backend。

    Broker:

      broker是一个消息传输的中间件,可以理解为一个邮箱。每当应用程序调用celery的异步任务的时候,会向broker传递消息,而后celery的worker将会取到消息,进行程序执行,好吧,这个邮箱可以看成是一个消息队列,其中Broker的中文意思是经纪人,其实就是一开始说的消息队列,用来发送和接受信息。这个broker有几个方案可供选择:RabbitMQ(消息队列),Redis(缓存数据库),数据库(不推荐),等等

    什么是backend?

      通常程序发送的消息,发完就完了,可能都不知道对方什么时候接受了,为此,celery实现了一个backend,用于存储这些消息以及celery执行的一些消息和结果,Backend是在Celery的配置中的一个配置项CELERY_RESULT_BACKEND,作用是保存结果和状态,如果你需要跟踪任务的状态,那么需要设置这一项,可以是Database backend,也可以是Cache backend.

    对于brokers,官方推荐是rabbitmq和redis,至于backend,就是数据库,为了简单可以都使用redis。

    2.怎么使用?

    2.1使用场景

    异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

    定时任务:定时执行某件事情,比如每天数据统计

    2.2 Celery安装配置

    pip install celery

    消息中间件:RabbitMQ/Redis

    app=Celery('任务名',backend='xxx',broker='xxx')

    2.3 Celery执行异步任务

    创建项目celerytest

    创建py文件:celery_app_task.py

    import celery
    import time
    # broker='redis://127.0.0.1:6379/2' 不加密码
    backend='redis://:123456@127.0.0.1:6379/1'
    broker='redis://:123456@127.0.0.1:6379/2'
    cel=celery.Celery('test',backend=backend,broker=broker)
    @cel.task
    def add(x,y):
        return x+y

    创建py文件:add_task.py,添加任务

    from celery_app_task import add
    result = add.delay(4,5)
    print(result.id)

    创建py文件:run.py,执行任务,或者使用命令执行:celery worker -A celery_app_task -l info

    注:windows下:celery worker -A celery_app_task -l info -P eventlet

    from celery_app_task import cel
    if __name__ == '__main__':
        cel.worker_main()
        # cel.worker_main(argv=['--loglevel=info')

    创建py文件:result.py,查看任务执行结果

    from celery.result import AsyncResult
    from celery_app_task import cel
    
    async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)
    
    if async.successful():
        result = async.get()
        print(result)
        # result.forget() # 将结果删除
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

    执行 add_task.py,添加任务,并获取任务ID

    执行 run.py ,或者执行命令:celery worker -A celery_app_task -l info

    执行 result.py,检查任务状态并获取结果

    3.多任务结构

    pro_cel
       ├── celery_task# celery相关文件夹
       │   ├── celery.py   # celery连接和配置相关文件,必须叫这个名字
       │   └── tasks1.py    # 所有任务函数
        └── tasks2.py    # 所有任务函数
       ├── check_result.py # 检查结果
       └── send_task.py    # 触发任务

    celery.py

    from celery import Celery

    cel = Celery('celery_demo',
                broker='redis://127.0.0.1:6379/1',
                backend='redis://127.0.0.1:6379/2',
                # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
                include=['celery_task.tasks1',
                         'celery_task.tasks2'
                        ])

    # 时区
    cel.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    cel.conf.enable_utc = False

    tasks1.py

    import time
    from celery_task.celery import cel

    @cel.task
    def test_celery(res):
       time.sleep(5)
       return "test_celery任务结果:%s"%res

    tasks2.py

    import time
    from celery_task.celery import cel
    @cel.task
    def test_celery2(res):
       time.sleep(5)
       return "test_celery2任务结果:%s"%res

    check_result.py

    from celery.result import AsyncResult
    from celery_task.celery import cel

    async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel)

    if async.successful():
       result = async.get()
       print(result)
       # result.forget() # 将结果删除,执行完成,结果不会自动删除
       # async.revoke(terminate=True) # 无论现在是什么时候,都要终止
       # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
    elif async.failed():
       print('执行失败')
    elif async.status == 'PENDING':
       print('任务等待中被执行')
    elif async.status == 'RETRY':
       print('任务异常后正在重试')
    elif async.status == 'STARTED':
       print('任务已经开始被执行')

    send_task.py

    from celery_task.tasks1 import test_celery
    from celery_task.tasks2 import test_celery2

    # 立即告知celery去执行test_celery任务,并传入一个参数
    result = test_celery.delay('第一个的执行')
    print(result.id)
    result = test_celery2.delay('第二个的执行')
    print(result.id)

    添加任务(执行send_task.py),开启work:celery worker -A celery_task -l info -P eventlet,检查任务执行结果(执行check_result.py)

    4.Celery执行定时任务

    设定时间让celery执行一个任务

    add_task.py

    from celery_app_task import add
    from datetime import datetime

    # 方式一
    # v1 = datetime(2019, 2, 13, 18, 19, 56)
    # print(v1)
    # v2 = datetime.utcfromtimestamp(v1.timestamp())
    # print(v2)
    # result = add.apply_async(args=[1, 3], eta=v2)
    # print(result.id)

    # 方式二
    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=10)
    task_time = utc_ctime + time_delay

    # 使用apply_async并设定时间
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)

    类似于contab的定时任务

    多任务结构中celery.py修改如下

    from datetime import timedelta
    from celery import Celery
    from celery.schedules import crontab

    cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
       'celery_task.tasks1',
       'celery_task.tasks2',
    ])
    cel.conf.timezone = 'Asia/Shanghai'
    cel.conf.enable_utc = False

    cel.conf.beat_schedule = {
       # 名字随意命名
       'add-every-10-seconds': {
           # 执行tasks1下的test_celery函数
           'task': 'celery_task.tasks1.test_celery',
           # 每隔2秒执行一次
           # 'schedule': 1.0,
           # 'schedule': crontab(minute="*/1"),
           'schedule': timedelta(seconds=2),
           # 传递参数
           'args': ('test',)
      },
       # 'add-every-12-seconds': {
       #     'task': 'celery_task.tasks1.test_celery',
       #     每年4月11号,8点42分执行
       #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
       #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
       #     'args': (16, 16)
       # },
    }

    启动一个beat:celery beat -A celery_task -l info

    启动work执行:celery worker -A celery_task -l info -P eventlet

    5.Django中使用Celery

    在项目目录下创建celeryconfig.py

    import djcelery
    djcelery.setup_loader()
    CELERY_IMPORTS=(
       'app01.tasks',
    )
    #有些情况可以防止死锁
    CELERYD_FORCE_EXECV=True
    # 设置并发worker数量
    CELERYD_CONCURRENCY=4
    #允许重试
    CELERY_ACKS_LATE=True
    # 每个worker最多执行100个任务被销毁,可以防止内存泄漏
    CELERYD_MAX_TASKS_PER_CHILD=100
    # 超时时间
    CELERYD_TASK_TIME_LIMIT=12*30

    在app01目录下创建tasks.py

    from celery import task
    @task
    def add(a,b):
       with open('a.text', 'a', encoding='utf-8') as f:
           f.write('a')
       print(a+b)

    视图函数views.py

    from django.shortcuts import render,HttpResponse
    from app01.tasks import add
    from datetime import datetime
    def test(request):
       # result=add.delay(2,3)
       ctime = datetime.now()
       # 默认用utc时间
       utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
       from datetime import timedelta
       time_delay = timedelta(seconds=5)
       task_time = utc_ctime + time_delay
       result = add.apply_async(args=[4, 3], eta=task_time)
       print(result.id)
       return HttpResponse('ok')

    settings.py


    INSTALLED_APPS = [
      ...
       'djcelery',
       'app01'
    ]

    ...

    from djagocele import celeryconfig
    BROKER_BACKEND='redis'
    BOOKER_URL='redis://127.0.0.1:6379/1'
    CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'

     

     

     

  • 相关阅读:
    SpringBoot 配置提示功能
    【源码解析】凭什么?spring boot 一个 jar 就能开发 web 项目
    【源码解析】自动配置的这些细节都不知道,别说你会 springboot
    validator 自动化校验
    【spring-boot 源码解析】spring-boot 依赖管理梳理图
    js防抖节流
    koa2怎么自定义一个中间件
    nuxt 脚手架创建nuxt项目中不支持es6语法的解决方案
    vue中兄弟之间组件通信
    js中css样式兼容各个浏览器写法
  • 原文地址:https://www.cnblogs.com/zhaijihai/p/10374481.html
Copyright © 2011-2022 走看看