zoukankan      html  css  js  c++  java
  • Celery

    Celery

      Celery 官网:http://www.celeryproject.org/
      Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
      Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

    Celery架构

      Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(backend  -  task result store)组成。

    消息中间件: Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

    任务执行单元: Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

    任务结果存储: Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

    使用场景

      异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

      定时任务:定时执行某件事情,比如每天数据统计

    Celery的安装配置:  pip install celery

    Celery执行异步任务

    project
        ├── celery_task      # celery包
        │   ├── __init__.py # 包文件
        │   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py
        │   └── tasks.py    # 所有任务函数
        ├── add_task.py      # 添加任务
        └── get_result.py   # 获取结果

    不涉及django

    简单使用

    celery_task/celery.py

    from celery import Celery
    
    # 通过Celery功能产生一个celery应用
    broker = 'redis://127.0.0.1:6379/14'  # 任务仓库
    backend = 'redis://127.0.0.1:6379/15'  # 结果仓库
    include = ['celery_task.task1', 'celery_task.task2']  # 任务们,完成需求的函数所在的文件
    app = Celery(broker=broker, backend=backend, include=include)
    # 在(luffy) C:UsersAdministrator.SC-201906081554Desktopluffyluffyapiscriptscelery框架高级使用下启动worker: pip3 install eventlet  celery worker -A celery_task -l info -P eventlet
    # 非windows  命令:celery worker -A celery_task -l info
    # 手动添加任务

    celery_task/task1.py

    from .celery import app
    @app.task
    def add(n1, n2):
        print('运算数', n1, n2)
        print('运算结果: %s' % (n1 + n2))
        return n1 + n2

    celery_task/task2.py

    from .celery import app
    @app.task
    def low(n1, n2):
        print(('减法:%s') % (n1 - n2))
        return n1 - n2

    add_task.py   添加任务

    from celery_task import task1,task2

    # 添加立即执行任务 # 使用模块中的函数, 和celery没有任何关系 # res
    = task1.add(10,5) # print(res) # res1 = task2.low(10,15) # print(res1) # 调用celery框架方法,完成任务添加 # res = task1.add.delay(100, 150) # print(res) # res1 = task2.low.delay(88,22) # print(res1)
    # 添加延迟任务
    from datetime import datetime, timedelta def eta_second(second): ctime = datetime.now() utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) time_delay = timedelta(seconds=second) return utc_ctime + time_delay res = task2.low.apply_async(args=(200, 50), eta=eta_second(10)) print(res)

    get_result.py  获得结果

    from celery_task.celery import app
    from celery.result import AsyncResult
    # id = '00c4f3c8-cdb8-48bb-b73e-4cde9f530151'  # 失败任务
    id = 'feb1c004-e40f-45a0-9796-32e9ddde07c9'  # 成功任务
    if __name__ == '__main__':
        async = AsyncResult(id=id, app=app)
        if async.successful():
            result = async.get()
            print(result)
        elif async.failed():
            print('任务失败')
        elif async.status == 'PENDING':
            print('任务等待中被执行')
        elif async.status == 'RETRY':
            print('任务异常后正在重试')
        elif async.status == 'STARTED':
            print('任务已经开始被执行')

    高级使用

    celery_task/celery.py

    from celery import Celery
    broker = 'redis://127.0.0.1:6379/14'
    backend = 'redis://127.0.0.1:6379/15'
    include = ['celery_task.tasks']
    app = Celery(broker=broker, backend=backend, include=include)
    
    app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.enable_utc = False
    
    from datetime import timedelta
    from celery.schedules import crontab
    app.conf.beat_schedule = {
        'jump_task': {
            'task': 'celery_task.tasks.jump',  # task:任务源
            'schedule': timedelta(seconds=3),  # schedule:添加任务的时间配置,每3秒添加一次任务
            # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
            'args': (25, 3),  # args:执行任务所需参数
        }
    }

    celery_task/tasks.py

    from .celery import app
    @app.task
    def jump(n1, n2):
        print("积: %s" % (n1 * n2))
        return n1 *n2
    
    @app.task
    def full(n1, n2):
        print("商: %s" % (n1 // n2))
        return n1 // n2

    django中使用

    celery_task/celery.py

    非windows:  命令:celery worker -A celery_task -l info

    windows:(luffy) C:UsersAdministrator.SC-201906081554Desktopluffyluffyapi>

    启动worker:pip3 install eventlet     celery worker -A celery_task -l info -P eventlet

    启动beat:celery beat -A celery_task -l info     添加任务

    # 如果对环境变量操作不是很透彻,将celery_task包放在项目根目录下
    # 配置django环境
    import os, django
    # import sys
    # sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")
    django.setup()
    
    from celery import Celery
    broker = 'redis://127.0.0.1:6379/14'  # 任务仓库
    backend = 'redis://127.0.0.1:6379/15'  # 结果仓库
    include = ['celery_task.tasks']  # 任务们,完成需求的函数所在的文件
    app = Celery(broker=broker, backend=backend, include=include)
    
    # 在高级使用文件夹下启动worker:celery worker -A celery_task -l info -P eventlet
    # 在高级使用文件夹下在另外一个终端启动beat:celery beat -A celery_task -l info
    # beat也是一个socket,启动后会根据配置文件,自动添加任务(定时任务)
    # beat添加任务,worker执行任务 # 时区 app.conf.timezone
    = 'Asia/Shanghai' app.conf.enable_utc = False from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { 'update_banner_list_task': { 'task': 'celery_task.tasks.update_banner_list', # task:任务源 'schedule': timedelta(seconds=10), # schedule:添加任务的时间配置 # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点 'args': (), # args:执行任务所需参数 } }

    celery_task/tasks.py

    from .celery import app
    from home.models import Banner
    from settings.const import BANNER_COUNT
    from home.serializers import BannerModelSerializer
    from django.core.cache import cache
    @app.task
    def update_banner_list():
        # 获取最新内容
        banner_query = Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:BANNER_COUNT]
        # 序列化
        banner_data = BannerModelSerializer(banner_query, many=True).data
        for banner in banner_data:
            banner['image'] = 'http://127.0.0.1:8000' + banner['image']
        # 更新缓存
        cache.set('banner_list', banner_data)
        return True
    dayehui
  • 相关阅读:
    JVM OOM处理
    Scala可变参数列表,命名参数和参数缺省
    Scala闭包
    Scala应用函数
    Scala函数字面量简化写法
    Scala函数字面量
    Scala中class和object的区别
    数据库的优化
    svn: warning: 'xxxxxx' is already under version control
    svn status 显示 ~xx
  • 原文地址:https://www.cnblogs.com/zrh-960906/p/11787668.html
Copyright © 2011-2022 走看看