zoukankan      html  css  js  c++  java
  • Celery 收下这捆芹菜!

    Celery简介

    Celery是一个强大的 分布式任务队列 的 异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)

    Celery 官网:http://www.celeryproject.org/
    Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
    Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

    Celery构成

    Task

    任务模块, 包含异步任务和定时任务, 异步任务通常在业务逻辑中被触发并发往Broker任务队列, 定时任务由Celery Beat 进程周期性地发往Broker任务队列

    Broker

    消息中间件, 就是任务调度队列, 用来接收任务, 将任务存储到队列中, 就像是生产者与消费者模型中的队列一样
    Celery本身不提供Broker, 官方推荐使用RabbitMQ和Redis

    Worker

    任务的执行单元, 它实时监控Broker队列, 获取队列中的任务, 并执行, 可以看做是生产者与消费者模型中的消费者

    Backend

    任务执行结果的存储单元, 用来存储任务结果, 以便查询
    Celery本身不提供Backend, 官方推荐使用RabbitMQ和Redis

    Celery使用

    安装

    python安装Celery: pip install celery
    我们使用Redis作为Broker和Backend, 因此确保你的设备配置了Redis环境

    基本使用

    """
    project
        ├── celery_task  	# celery包
        │   ├── __init__.py # 包文件
        │   ├── celery.py   # celery连接和配置相关文件,且名字必须是celery.py
        │   └── tasks.py    # 所有任务函数
        ├── add_task.py  	# 添加任务
        └── get_result.py   # 获取结果
    
    """
    

    异步任务: delay

    """
    1.执行add_task.py将任务添加到队列
    2.cmd切换至所在文件目录celery_task下运行worker: 
    	>: celery worker -A celery_task -l info -P eventlet
    3.执行get_result.py获取任务结果
    
    """
    
    # celery.py
    from celery import Celery
    
    # 配置消息中间件, 用来接收任务
    broker = 'redis://127.0.0.1:6379/0'
    
    # 配置backend, 用来存储任务执行结果
    backend = 'redis://127.0.0.1:6379/1'
    
    # worker, 任务执行单元
    app = Celery(broker=broker, backend=backend)
    
    # tasks.py
    from .celery import app
    
    # 定义任务
    @app.task
    def add(x, y):
        res = x + y
        print(f'{x}+{y}={res}')
        return res
    
    
    @app.task
    def minus(x, y):
        res = x - y
        print(f'{x}-{y}={res}')
        return res
    
    # add_task.py
    from .tasks import add, minus
    
    # 在业务逻辑中触发异步任务
    add_results = add.delay(10, 20)
    
    # 任务执行结果的id
    print(add_results.id)
    
    # get_result.py
    from .celery import app
    from .add_tasks import add_results
    from celery.result import AsyncResult
    
    if __name__ == '__main__':
        # 获取异步任务结果对象, 参数:id, app
        async = AsyncResult(id=add_results.id, app=app)
        
        if async.successful():
            result = async.get()
            print('任务执行成功')
            print(result)
        elif async.failed():
            print('任务失败')
        elif async.status == 'PENDING':
            print('任务等待中被执行')
        elif async.status == 'RETRY':
            print('任务异常后正在重试')
        elif async.status == 'STARTED':
            print('任务已经开始被执行')
    

    延迟任务: apply_async

    from celery_task.tasks import add
    from datetime import timedelta, datetime
    
    # 添加延时任务, 10秒后执行
    add_results = add.apply_async(args=(10, 20), eta=datetime.utcnow() + timedelta(seconds=10))
    
    # 任务执行结果的id
    print(add_results.id)
    
    
    

    周期任务: beat_schedule

    注意: 周期任务是通过 celery beat 来周期性添加的, , 因此启动worker服务之后, 还要重开一个cmd窗口启动beat服务: celery beat -A celery_task -l info

    # celery.py
    from celery import Celery
    from datetime import timedelta
    from celery.schedules import crontab
    
    # 配置消息中间件, 用来接收任务
    broker = 'redis://127.0.0.1:6379/0'
    
    # 配置backend, 用来存储任务执行结果
    backend = 'redis://127.0.0.1:6379/1'
    
    # worker, 任务执行单元
    app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
    
    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    
    app.conf.enable_utc = False
    
    app.conf.beat_schedule = {
        'add-task': {
            'task': 'celery_task.tasks.add',
            # 每10秒添加一次任务
            'schedule': timedelta(seconds=10),
            'args': (10, 20)
        },
        'minus-task': {
            'task': 'celery_task.tasks.minus',
            # 每周一八点半添加一次任务
            'schedule': crontab(hour=8, minute=30, day_of_week=1),
            'args': (20, 10)
        }
        
    }
    
    

    Django配置Celery

    在根目录下创建包文件夹 celery_task

    """
    project
        ├── celery_task  	# celery包
            ├── __init__.py # 包文件
            ├── celery.py   # celery连接和配置相关文件,且名字必须是celery.py
            └── tasks.py    # 所有任务函数   
    """
    

    celery.py

    # 1.加载Django配置环境
    import os
    
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'luffyapi.settings.dev')
    
    # 2.加载Celery配置环境
    from celery import Celery
    
    broker = 'redis://127.0.0.1:6379/0'
    backend = 'redis://127.0.0.1:6379/1'
    app = Celery(broker=broker, backend=backend, include=['celery_task.tasks.py'])
    
    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    # UTC
    app.conf.enable_utc = False
    
    
    from datetime import timedelta
    app.conf.beat_schedules = {
        'update-banner-cache': {
            'task': 'celery_task.tasks.py.update_banner_cache',
            # 每10秒添加一次
            'schedule': timedelta(seconds=10),
            'args': ()
        }
    }
    
    
    

    task.py

    from .celery import app
    from home.models import Banner
    from home.serializers import BannerModerSerializer
    from django.conf import settings
    from django.core.cache import cache
    
    
    @app.task
    def update_banner_cache():
        print('lalal')
        banner_query = Banner.objects.filter(is_delete=False, is_show=True).order_by('-order').all()
        serializer_obj = BannerModerSerializer(data=banner_query, many=True)
        banner_data = serializer_obj.data
        for banner in banner_data:
            banner['image'] = settings.BASE_URL + banner.get('image')
        cache.set('banner_cache', banner_data)
        return True
    
  • 相关阅读:
    Mysql元数据分析
    python编码encode和decode
    自己写的Python数据库连接类和sql语句拼接方法
    【甘道夫】Sqoop1.99.3基础操作--导入Oracle的数据到HDFS
    SVN配置以及自己主动部署到apache虚拟文件夹
    css中使用id和class 的不同
    Android OpenGL ES(七)----理解纹理与纹理过滤
    一键安装 redmine on windows 和发邮件设置
    足球大数据:致足球怀疑论者-The Counter(s)-Reformation反教条改革
    【Android进阶篇】Fragment的两种载入方式
  • 原文地址:https://www.cnblogs.com/bigb/p/12180508.html
Copyright © 2011-2022 走看看