zoukankan      html  css  js  c++  java
  • celery 分布式异步任务框架(celery简单使用、celery多任务结构、celery定时任务、celery计划任务、celery在Django项目中使用Python脚本调用Django环境)

     

     

     

    一.celery简介

    Celery 是一个强大的 分布式任务队列 的 异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。

    Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

    可以看到,Celery 主要包含以下几个模块:

    • 任务模块 Task

      包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。

    • 消息中间件 Broker

      Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。

    • 任务执行单元 Worker

      Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。

    • 任务结果存储 Backend

      Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, redis 和 MongoDB 等。

     所以总结一下celery:它是一个处理大量消息的分布式系统,能异步任务、定时任务,使用场景一般用于耗时操作的多任务或者定时性的任务。

    二.celery安装与使用

    pycharm安装:

    pip3 install celery

    初步使用:(创建一个python项目)

    ① 实例化一个celery对象,使用该对象.task装饰需要管理的任务函数:

    # celery_task.py
    
    from celery import Celery
    
    """
    # 如果redis没有设置密码
    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    """
    broker = 'redis://:12345@127.0.0.1:6379/1'
    backend = 'redis://:12345@127.0.0.1:6379/2'
    # c1是实例化产生的celery的名字,因为会存在多个celery
    app = Celery('c1', broker=broker, backend=backend)
    
    # 需要使用一个装饰器,来管理该任务(函数)
    @app.task
    def add(x, y):
        import time
        time.sleep(1)
        return x + y

    ② 将装饰的任务函数条件到消息队列中,此时提交的任务函数并没有执行,只是提交到worker,它会返回一个标识任务的字符串

    # submit.task.py
    
    # 用于提交任务
    from celery_task import add
    # 提交任务到消息队列中,这里只是将任务提交,并没有执行
    
    res = add. delay(3, 8)
    print(res)
    # 结果是标识任务的字符串(id号)
    # 7811a028-428c-4dd5-9135-788e26e694a7

    ③ 使用命令启动worker去刚才提交的执行任务

    linux: celery worker -A celery_task -l info   
    windows下:celery worker -A celery_task -l info -P eventlet

    ④ 查看结果,根据提交任务返回的字符串去查询

    from celery.result import AsyncResult
    from celery_task import app
    
    async = AsyncResult(id='bd600820-9366-4220-a679-3e435ae91e71', app=app)
    
    if async.successful():
        result = async.get()
        print(result)
    
    elif async.failed():
        print('执行失败')
    
    elif async.status == 'PENDING':
        print('任务等待中')
    
    elif async.status == 'RETRY':
        print('任务异常后重试')
    
    elif async.status == 'STARTED':
        print('任务正在执行')

    celery简单使用流程:

    -celery的使用
        -pip3 install celery
        -写一个py文件:celery_task
            -1 指定broker(消息中间件),指定backend(结果存储)
            -2 实例化产生一个Celery对象 app=Celery('名字',broker,backend)
            -3 加装饰器绑定任务,在函数(add)上加装饰器app.task
            -4 其他程序提交任务,先导入add,add.delay(参,参数),会将该函数提交到消息中间件,但是并不会执行,有个返回值,直接print会打印出任务的id,以后用id去查询任务是否执行完成
            -5 启动worker去执行任务:
            linux: celery worker -A celery_task_s1 -l info   
            windows下:celery worker -A celery_task_s1 -l info -P eventlet
            -6 查看结果:根据id去查询
                async = AsyncResult(id="bd600820-9366-4220-a679-3e435ae91e71", app=app)
                if async.successful():
                    #取出它return的值
                    result = async.get()
                    print(result)

    celery的多任务

    # celery的多任务结构
        -项目结构:
            pro_cel
                ├── celery_task# celery相关文件夹
                │   ├── celery.py   # celery连接和配置相关文件,必须叫这个名字
                │   └── tasks1.py    #  所有任务函数
                │   └── tasks2.py    #  所有任务函数
                ├── check_result.py # 检查结果
                └── send_task.py    # 触发任务
        -启动worker,celery_task是包的名字
            celery worker -A celery_task -l info -P eventlet

     按照多任务文件结构创建文件:

    注意celery.py这个文件的文件名是固定的,不能改,task_1和task_2可以自己定义,他俩代表自定义的任务分类,还可以再创建task_3。。。等其它名字的任务文件,send_task.py是提交任务到worker,check_result.py是查看结果的

    # celery.py
    
    from celery import Celery
    broker = 'redis://:12345@127.0.0.1:6379/1'
    backend = 'redis://:12345@127.0.0.1:6379/2'
    # c1是实例化产生的celery的名字,因为会存在多个celery
    app = Celery('c1', broker=broker, backend=backend,
                 # 包含一些2个任务文件,去相应的py文件找任务,对多个任务进行分类
                 include=[
                     'celery_task.task_1',
                     'celery_task.task_2',
                 ])
    
    
    # celery提供一些配置,具体可查看官方文档
    # app.conf.timezone = 'Asia/Shanghai'

    在send_task.py种右键运行,提交任务到worker(这里打印了提交的2个任务的id)

    # task_1.py
    from celery_task.celery import app
    
    @app.task
    def add1(x, y):
        import time
        time.sleep(0.5)
        return x + y
    
    # task_2.py
    from celery_task.celery import app
    
    @app.task
    def add2(x, y):
        import time
        time.sleep(1)
        return x * y
    # send_task.py
    from celery_task.task_1 import add1
    from celery_task.task_2 import add2
    
    
    res1 = add1.delay(3, 8)
    print(res1)   # 16e847f3-fc14-4391-89e2-e2b3546872cf
    
    res2 = add2.delay(4, 9)
    print(res2)   # 858c0ae5-8516-4473-8be5-7501fb856ff4

    启动worker,celery_task是包的名字
    celery worker -A celery_task -l info -P eventlet

    然后将打印的2个id在check_result.py中进行查询结果

    # check_reslut.py
    from celery.result import AsyncResult
    from celery_task.celery import app
    
    for i in ['16e847f3-fc14-4391-89e2-e2b3546872cf', '858c0ae5-8516-4473-8be5-7501fb856ff4']:
        async = AsyncResult(id=i, app=app)    
        if async.successful():
            result = async.get()
            print(result)
        
        elif async.failed():
            print('执行失败')
        
        elif async.status == 'PENDING':
            print('任务等待中')
        
        elif async.status == 'RETRY':
            print('任务异常后重试')
        
        elif async.status == 'STARTED':
            print('任务正在执行')

    celery的定时任务

    方式一:执行时间在年月日时分秒

    在提交任务的地方修改:

    # send_task.py
    
    from celery_task.task_1 import add1
    from celery_task.task_2 import add2
    # 执行定时任务,3s以后执行add1、add2任务
    from datetime import datetime
    # 设置任务执行时间2019年7月12日21点45分12秒
    v1 = datetime(2019, 7, 12, 21, 48, 12)
    print(v1)  # 2019-07-12 21:45:12
    # 将v1时间转成utc时间
    v2 = datetime.utcfromtimestamp(v1.timestamp())
    print(v2)  # 2019-07-12 13:45:12
    # 取出要执行任务的时间对象,调用apply_async方法,args是任务函数传的参数,eta是执行的时间
    result1 = add1.apply_async(args=[3, 8], eta=v2)
    result2 = add2.apply_async(args=[4, 9], eta=v2)
    print(result1.id)
    print(result2.id)

    方式二:通过延迟执行的时间算出执行的具体utc时间,与方式一基本相同

    # send_task.py
    
    # 方式二:实际上和方法一类似,多了一个延迟时间,也就是用现在时间和推迟执行的时间计算出任务执行的最终utc时间
    # 然后也是调用apply_async方法。
    from datetime import datetime
    ctime = datetime.now()
    # 默认使用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    # 使用timedelta模块,拿到10秒后的时间对象,这里参数可以传秒、毫秒、微秒、分、小时、周、天
    time_delay = timedelta(seconds=10)
    # 得到任务运行时间:
    task_time = utc_ctime + time_delay
    result1 = add1.apply_async(args=[3, 8], eta=task_time)
    result2 = add2.apply_async(args=[4, 9], eta=task_time)
    print(result1.id)
    print(result2.id)

    celery的计划任务

    计划任务需要在celery.py中添加代码,然后需要beat一下,才能将计划开启

    # celery.py中
    
    from celery import Celery
    broker = 'redis://:12345@127.0.0.1:6379/1'
    backend = 'redis://:12345@127.0.0.1:6379/2'
    # c1是实例化产生的celery的名字,因为会存在多个celery
    app = Celery('c1', broker=broker, backend=backend,
                 # 包含一些2个任务文件,去相应的py文件找任务,对多个任务进行分类
                 include=[
                     'celery_task.task_1',
                     'celery_task.task_2',
                     'celery_task.task_3',
                 ])
    
    
    # celery提供一些配置,具体可查看官方文档
    # app.conf.timezone = "Asia/Shanghai"
    # app.conf.enable_utc = True
    
    
    # 计划任务
    from datetime import timedelta
    from celery.schedules import crontab
    app.conf.beat_schedule = {
        'submit_every_2_seconds': {
            # 计划的任务执行函数
            'task': 'celery_task.task_1.add1',
            # 每个2秒执行一次
            'schedule': timedelta(seconds=2),
            # 传递的任务函数参数
            'args': (3, 9)
        },
        'submit_every_3_seconds': {
                # 计划的任务执行函数
                'task': 'celery_task.task_2.add2',
                # 每个3秒执行一次
                'schedule': timedelta(seconds=3),
                # 传递的任务函数参数
                'args': (4, 7)
        },
        'submit_in_fix_datetime': {
            'task': 'celery_task.task_3.add3',
            # 比如每年的7月13日10点53分执行
            # 注意:默认使用utc时间,当前的时间中的小时必须要-8个小时才会到点提交
            'schedule': crontab(minute=53, hour=2, day_of_month=13, month_of_year=7),
            
            '''
            # 如果不想-8,可以先设置时区,再按正常时间设置
            app.conf.timezone = "Asia/Shanghai"
            app.conf.enable_utc = True
            '''
            'args': ('Hello World',)
        }
    
    }
    
    # 上面写完后,需要起一个进程,启动计划任务
    # celery beat -A celery_task -l info
    
    # 启动worker:
    # celery worker -A celery_task -l info -P eventlet

     Django中使用celery

    django-celery:由于djang-celery模块对版本的要求过于严格,而且容易出现很多bug,所以不建议使用

    直接使用celery多任务结构的,将celery多任务结构的代码文件夹celery_task拷贝到Django项目中,然后在视图函数中进行任务提交、然后进行结构查看。(启动项目时候记得将worker启动起来,注意启动路径要跟你拷贝的celery_task文件同级)

    注意:当我们在Django项目中使用celery,在celery的任务函数中不能直接调用django的环境(比如orm方法查询数据库),需要添加代码调用Django环境

    在Python脚本中调用Django环境

    import os
    # 加载Django环境,bbs是所在的Django项目名称
        os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'bbs.settings')
        # 引入Django模块
        import django
        # 初始化Django环境
        django.setup()
        # 从app当中导入models
        from app01 import models
        # 调用操作,拿到数据库中的所有Book数据对象
        books = models.Books.objects.all()
  • 相关阅读:
    xheditor编辑器自动上传外链图片及QQ截图等(升级支持webp格式)
    jQuery对checkbox的各种操作
    Jquery过滤选择器,选择前几个元素,后几个元素,内容过滤选择器等
    Java8新特性之forEach+Lambda 表达式遍历Map和List
    My97日期控件My97 DatePicker选择每月最后一天(周6周日不能选,节假日不能选,高亮每个月最后一个股票交易日)
    算法系列:日历算法
    mysql 将多个查询结果合并成一行
    js判断浏览器类型以及语言
    升级至 spring-5.3.0 关于 jdbcTemplate.query(sql, parameters, rowMapper) 的解决
    解决 redis Increased maximum number of open files to 10032 (it was originally set to 256).
  • 原文地址:https://www.cnblogs.com/ppap/p/11181508.html
Copyright © 2011-2022 走看看