zoukankan      html  css  js  c++  java
  • Django 大神手把手带你上路 ~ celery系列

    Celery

    官方

    Celery 官网:http://www.celeryproject.org/

    Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

    Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

    Celery架构

    Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

    消息中间件

    Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

    任务执行单元

    Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

    任务结果存储

    Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

    使用场景

    异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

    定时任务:定时执行某件事情,比如每天数据统计

    Celery的安装配置

    pip install celery

    消息中间件:RabbitMQ/Redis

    app=Celery('任务名', broker='xxx', backend='xxx')

    Celery执行异步任务

    包架构封装

    project
        ├── celery_task  	# celery包
        │   ├── __init__.py # 包文件
        │   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py
        │   └── tasks.py    # 所有任务函数
        ├── add_task.py  	# 添加任务
        └── get_result.py   # 获取结果
    

    基本使用

    celery.py

    # 1)创建app + 任务
    
    # 2)启动celery(app)服务:
    # 非windows
    # 命令:celery worker -A celery_task -l info
    # windows:
    # pip3 install eventlet
    # celery worker -A celery_task -l info -P eventlet
    
    # 3)添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本
    
    # 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本
    
    
    from celery import Celery
    
    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
    

    tasks.py

    from .celery import app
    import time
    @app.task
    def add(n, m):
        print(n)
        print(m)
        time.sleep(10)
        print('n+m的结果:%s' % (n + m))
        return n + m
    
    @app.task
    def low(n, m):
        print(n)
        print(m)
        print('n-m的结果:%s' % (n - m))
        return n - m
    

    add_task.py

    from celery_task import tasks
    
    # 添加立即执行任务
    t1 = tasks.add.delay(10, 20)
    t2 = tasks.low.delay(100, 50)
    print(t1.id)
    
    
    # 添加延迟任务
    from datetime import datetime, timedelta
    def eta_second(second):
        ctime = datetime.now()
        utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
        time_delay = timedelta(seconds=second)
        return utc_ctime + time_delay
    
    tasks.low.apply_async(args=(200, 50), eta=eta_second(10))
    

    get_result.py

    from celery_task.celery import app
    
    from celery.result import AsyncResult
    
    id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
    if __name__ == '__main__':
        async = AsyncResult(id=id, app=app)
        if async.successful():
            result = async.get()
            print(result)
        elif async.failed():
            print('任务失败')
        elif async.status == 'PENDING':
            print('任务等待中被执行')
        elif async.status == 'RETRY':
            print('任务异常后正在重试')
        elif async.status == 'STARTED':
            print('任务已经开始被执行')
    

    高级使用

    celery.py

    # 1)创建app + 任务
    
    # 2)启动celery(app)服务:
    # 非windows
    # 命令:celery worker -A celery_task -l info
    # windows:
    # pip3 install eventlet
    # celery worker -A celery_task -l info -P eventlet
    
    # 3)添加任务:自动添加任务,所以要启动一个添加任务的服务
    # 命令:celery beat -A celery_task -l info
    
    # 4)获取结果
    
    
    from celery import Celery
    
    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
    
    
    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.enable_utc = False
    
    # 任务的定时配置
    from datetime import timedelta
    from celery.schedules import crontab
    app.conf.beat_schedule = {
        'low-task': {
            'task': 'celery_task.tasks.low',
            'schedule': timedelta(seconds=3),
            # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
            'args': (300, 150),
        }
    }
    
    tasks.py
    from .celery import app
    
    import time
    @app.task
    def add(n, m):
        print(n)
        print(m)
        time.sleep(10)
        print('n+m的结果:%s' % (n + m))
        return n + m
    
    
    @app.task
    def low(n, m):
        print(n)
        print(m)
        print('n-m的结果:%s' % (n - m))
        return n - m
    

    get_result.py

    from celery_task.celery import app
    
    from celery.result import AsyncResult
    
    id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
    if __name__ == '__main__':
        async = AsyncResult(id=id, app=app)
        if async.successful():
            result = async.get()
            print(result)
        elif async.failed():
            print('任务失败')
        elif async.status == 'PENDING':
            print('任务等待中被执行')
        elif async.status == 'RETRY':
            print('任务异常后正在重试')
        elif async.status == 'STARTED':
            print('任务已经开始被执行')
    

    django中使用

    celery.py

    # 重点:要将 项目名.settings 所占的文件夹添加到环境变量
    # import sys
    # sys.path.append(r'项目绝对路径')
    
    # 开启django支持
    import os
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', '项目名.settings')
    import django
    django.setup()
    
    
    
    # 1)创建app + 任务
    
    # 2)启动celery(app)服务:
    # 非windows
    # 命令:celery worker -A celery_task -l info
    # windows:
    # pip3 install eventlet
    # celery worker -A celery_task -l info -P eventlet
    
    # 3)添加任务:自动添加任务,所以要启动一个添加任务的服务
    # 命令:celery beat -A celery_task -l info
    
    # 4)获取结果
    
    
    from celery import Celery
    
    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
    
    
    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.enable_utc = False
    
    # 任务的定时配置
    from datetime import timedelta
    from celery.schedules import crontab
    app.conf.beat_schedule = {
        'django-task': {
            'task': 'celery_task.tasks.test_django_celery',
            'schedule': timedelta(seconds=3),
            'args': (),
        }
    }
    

    tasks.py

    from .celery import app
    # 获取项目中的模型类
    from api.models import Banner
    @app.task
    def test_django_celery():
        banner_query = Banner.objects.filter(is_delete=False).all()
        print(banner_query)
    
  • 相关阅读:
    Java基础——clone()方法浅析
    Unity shader error: “Too many texture interpolators would be used for ForwardBase pass”
    ar 解压一个.a文件报错: xxx.a is a fat file (use libtool(1) or lipo(1) and ar(1) on it)
    How to set up "lldb_codesign" certificate!
    Unity-iPhone has Conflicting Provisioning Settings
    ETC1/DXT1 compressed textures are not supported when publishing to iPhone
    Xcode 提交APP时遇到 “has one iOS Distribution certificate but its private key is not installed”
    XCode iOS之应用程序标题本地化
    苹果电脑(Mac mini或Macbook或iMac)恢复出厂设置
    Unity 4.7 导出工程在XCode10.1上编译报错
  • 原文地址:https://www.cnblogs.com/bladecheng/p/11593827.html
Copyright © 2011-2022 走看看