zoukankan      html  css  js  c++  java
  • celery

    celery架构

    消息中间件

    Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

    任务执行单元

    Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

    任务结果存储

    Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP,Redis等

    版本支持情况

    Celery version 4.0 runs on
            Python ❨2.7, 3.4, 3.5❩
            PyPy ❨5.4, 5.5❩
        This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.
    
        If you’re running an older version of Python, you need to be running an older version of Celery:
    
            Python 2.6: Celery series 3.1 or earlier.
            Python 2.5: Celery series 3.0 or earlier.
            Python 2.4 was Celery series 2.2 or earlier.
    
        Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
    

    使用场景

    异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

    定时任务:定时执行某件事情,比如每天数据统计

    Celery的安装配置

    pip install celery

    消息中间件:RabbitMQ/Redis

    app=Celery('任务名', broker='xxx', backend='xxx')


    celery简单实用

    第一步: 配置celery参数 broker, worker, backend

    celery_app_task.py

    import celery
    import time
    broker = 'redis://127.0.0.1/1'
    backend = 'redis://127.0.0.1/2'
    
    app = celery.Celery(broker=broker, backend=backend)
    
    @app.task
    def add(n, m):
        time.sleep(3)
        return n + m
    

    第二步: 生成celery任务,将任务id存入redis(执行result.py)

    result.py

    from celery.result import AsyncResult
    from celery_app_task import app
    from celery_app_task import add
    
    result = add.delay(10, 35)
    async = AsyncResult(id=result.id, app=app)
    
    if async.successful():
        result = async.get()
        print(result)
    
    if async.failed():
        print('任务失败了')
    

    第三步. 启动celery任务

    celery  -A celery_app_task worker --loglevel=INFO
    


    celery多任务

    ├── __pycache__
    │   └── add_tasks.cpython-36.pyc
    ├── add_tasks.py
    ├── celery_task
    │   ├── __init__.py
    │   ├── celery.py
    │   └── tasks.py
    └── get_results.py
    

    第一步: 配置celery参数 broker, worker, backend

    celery.py

    import celery
    broker = 'redis://127.0.0.1/1'
    backend = 'redis://127.0.0.1/2'
    
    app = celery.Celery('cleryDemo', broker=broker, backend=backend, include=['celery_task.tasks'])
    
    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    
    # 是否使用UTC
    app.conf.enable_utc = False
    

    第二步: 任务文件

    • 包文件配置__init__.py
    from .celery import app
    from .tasks import *
    

    • 任务文件

    tasks.py

    from . import app
    
    
    @app.task
    def add(n, m):
        return n + m
    
    
    @app.task
    def low(n, m):
        return n - m
    
    
    __all__ = ['add', 'low']
    

    第三步: 添加到celery任务列表

    add_tasks.py

    执行该文件

    from celery_task import add, low
    
    
    r1 = add.delay(200, 100)
    addID = r1.id
    
    r2 = low.delay(200, 100)
    lowID = r2.id
    

    第四步:获取celery入口文件

    get_results.py

    from celery.result import AsyncResult
    from celery_task.celery import app
    
    async = AsyncResult(id='2bf35cc7-07d5-4e62-bae8-4472efb869fc', app=app)
    
    if async.successful():
        result = async.get()
        print(result)
    
    elif async.failed():
        print('执行失败')
    
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
    

    第五步: 启动celery任务

    cd celery多任务
    celery  -A celery_task worker --loglevel=INFO
    


    celery延时任务

    ├── add_tasks.py
    ├── celery_task
    │   ├── __init__.py
    │   ├── celery.py
    │   └── tasks.py
    └── get_results.py
    

    第一步: 配置celery参数 broker, worker, backend

    celery.py

    import celery
    broker = 'redis://127.0.0.1/1'
    backend = 'redis://127.0.0.1/2'
    
    app = celery.Celery('cleryDemo', broker=broker, backend=backend, include=['celery_task.tasks'])
    
    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    
    # 是否使用UTC
    app.conf.enable_utc = False
    

    第二步: 任务文件

    • 包文件配置__init__.py
    from .celery import app
    from .tasks import *
    

    • 任务文件

    tasks.py

    from . import app
    
    
    @app.task
    def add(n, m):
        return n + m
    
    
    @app.task
    def low(n, m):
        return n - m
    
    
    __all__ = ['add', 'low']
    

    第三步: 添加到celery任务列表

    add_tasks.py

    执行该文件

    from celery_task import add, low
    from datetime import datetime, timedelta
    
    #延时时间
    ctime = datetime.now()
    # 默认utc时间
    utcCtime = datetime.utcfromtimestamp(ctime.timestamp())
    timeDelay = timedelta(seconds=10)
    taskTime = utcCtime + timeDelay
    # 添加延迟任务
    r1 = add.apply_async(args=(200, 100), eta=taskTime)
    addID = r1.id
    
    r2 = low.apply_async(args=(200, 100), eta=taskTime)
    lowID = r2.id
    

    第四步:获取celery入口文件

    get_results.py

    from celery.result import AsyncResult
    from celery_task.celery import app
    
    async = AsyncResult(id='2bf35cc7-07d5-4e62-bae8-4472efb869fc', app=app)
    
    if async.successful():
        result = async.get()
        print(result)
    
    elif async.failed():
        print('执行失败')
    
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
    

    第五步: 启动celery任务

    cd celery延迟任务
    celery  -A celery_task worker --loglevel=INFO
    


    celery定时任务

    第一步: 配置celery参数 broker, worker, backend

    celery.py

    import celery
    from datetime import timedelta
    from celery.schedules import crontab
    
    broker = 'redis://127.0.0.1/1'
    backend = 'redis://127.0.0.1/2'
    
    app = celery.Celery('cleryDemo', broker=broker, backend=backend, include=['celery_task.tasks'])
    
    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    
    # 是否使用UTC
    app.conf.enable_utc = False
    
    # 配置任务
    
    app.conf.beat_schedule = {
        'add-every-5-seconds': {
            'task': 'celery_task.tasks.add',
            'schedule': timedelta(seconds=5),
            'args': (3000, 1000)
        },
        'low-every-10-seconds': {
            'task': 'celery_task.tasks.low',
            'schedule': timedelta(seconds=10),
            'args': (3000, 1000)
        },
        'add-every-time': {
            'task': 'celery_task.tasks.add',
            'schedule': crontab(hour=0, minute=0),
            'args': (3000, 1000)
        },
    }
    

    第二步: 任务文件

    • 包文件配置__init__.py
    from .celery import app
    from .tasks import *
    

    • 任务文件

    tasks.py

    from . import app
    
    
    @app.task
    def add(n, m):
        return n + m
    
    
    @app.task
    def low(n, m):
        return n - m
    
    
    __all__ = ['add', 'low']
    

    第三步:获取celery入口文件

    get_results.py

    from celery.result import AsyncResult
    from celery_task.celery import app
    
    async = AsyncResult(id='2bf35cc7-07d5-4e62-bae8-4472efb869fc', app=app)
    
    if async.successful():
        result = async.get()
        print(result)
    
    elif async.failed():
        print('执行失败')
    
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
    

    第五步: 启动celery任务

    cd celery定时任务
    celery  -A celery_task worker --loglevel=INFO
    celery  -A celery_task beat --loglevel=INFO 
    


  • 相关阅读:
    QT 读写sqllite数据库
    SQLLite 简介
    arcengine 开发经典帖 【强烈推荐仔细研读】
    IHookHelper的用法
    ArcSDE中Compress与Compact的区别
    以Network Dataset(网络数据集)方式实现的最短路径分析
    ArcGIS网络概述
    ClassLoader.getResourceAsStream(name);获取配置文件的方法
    Xml中SelectSingleNode方法,xpath查找某节点用法
    Spring整合JUnit4测试使用注解引入多个配置文件
  • 原文地址:https://www.cnblogs.com/cjwnb/p/14461486.html
Copyright © 2011-2022 走看看