zoukankan      html  css  js  c++  java
  • celery 分布式异步任务框架(celery简单使用、celery多任务结构、celery定时任务、celery计划任务、celery在Django项目中使用Python脚本调用Django环境)

    一、celery简介:

    Celery 是一个强大的 分布式任务队列 的 异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。

    Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

    可以看到,Celery 主要包含以下几个模块:

    • 任务模块 Task

      包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。

    • 消息中间件 Broker

      Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。

    • 任务执行单元 Worker

      Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。

    • 任务结果存储 Backend

      Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, redis 和 MongoDB 等。

     所以总结一下celery:它是一个处理大量消息的分布式系统,能异步任务、定时任务,使用场景一般用于耗时操作的多任务或者定时性的任务

    二、celery安装与使用

    pycharm安装:

    pip3 install celery

     初步使用:(创建一个Python项目)

    ① 实例化一个celery对象,使用该对象.task装饰需要管理的任务函数:

    # celery_task.py
    
    from celery import Celery
    
    """
    # 如果redis没有设置密码
    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    """
    broker = 'redis://:12345@127.0.0.1:6379/1'
    backend = 'redis://:12345@127.0.0.1:6379/2'
    # c1是实例化产生的celery的名字,因为会存在多个celery
    app = Celery('c1', broker=broker, backend=backend)
    
    # 需要使用一个装饰器,来管理该任务(函数)
    @app.task
    def add(x, y):
        import time
        time.sleep(1)
        return x + y

    ② 将装饰的任务函数条件到消息队列中,此时提交的任务函数并没有执行,只是提交到worker,它会返回一个标识任务的字符串

    # submit.task.py
    
    # 用于提交任务
    from celery_task import add
    # 提交任务到消息队列中,这里只是将任务提交,并没有执行
    
    res = add. delay(3, 8)
    print(res)
    # 结果是标识任务的字符串(id号)
    # 7811a028-428c-4dd5-9135-788e26e694a7

    ③ 使用命令启动worker去刚才提交的执行任务

    linux: celery worker -A celery_task -l info   
    windows下:celery worker -A celery_task -l info -P eventlet

    ④ 查看结果,根据提交任务返回的字符串去查询

    # check_res.py
    
    from celery.result import AsyncResult
    from celery_task import app
    
    async = AsyncResult(id='bd600820-9366-4220-a679-3e435ae91e71', app=app)
    
    if async.successful():
        result = async.get()
        print(result)
    
    elif async.failed():
        print('执行失败')
    
    elif async.status == 'PENDING':
        print('任务等待中')
    
    elif async.status == 'RETRY':
        print('任务异常后重试')
    
    elif async.status == 'STARTED':
        print('任务正在执行')

    celery简单使用流程:

    -celery的使用
        -pip3 install celery
        -写一个py文件:celery_task
            -1 指定broker(消息中间件),指定backend(结果存储)
            -2 实例化产生一个Celery对象 app=Celery('名字',broker,backend)
            -3 加装饰器绑定任务,在函数(add)上加装饰器app.task
            -4 其他程序提交任务,先导入add,add.delay(参,参数),会将该函数提交到消息中间件,但是并不会执行,有个返回值,直接print会打印出任务的id,以后用id去查询任务是否执行完成
            -5 启动worker去执行任务:
            linux: celery worker -A celery_task_s1 -l info   
            windows下:celery worker -A celery_task_s1 -l info -P eventlet
            -6 查看结果:根据id去查询
                async = AsyncResult(id="bd600820-9366-4220-a679-3e435ae91e71", app=app)
                if async.successful():
                    #取出它return的值
                    result = async.get()
                    print(result)

    celery的多任务

    # celery的多任务结构
        -项目结构:
            pro_cel
                ├── celery_task# celery相关文件夹
                │   ├── celery.py   # celery连接和配置相关文件,必须叫这个名字
                │   └── tasks1.py    #  所有任务函数
                │   └── tasks2.py    #  所有任务函数
                ├── check_result.py # 检查结果
                └── send_task.py    # 触发任务
        -启动worker,celery_task是包的名字
            celery worker -A celery_task -l info -P eventlet

     按照多任务文件结构创建文件:

    注意celery.py这个文件的文件名是固定的,不能改,task_1和task_2可以自己定义,他俩代表自定义的任务分类,还可以再创建task_3。。。等其它名字的任务文件,send_task.py是提交任务到worker,check_result.py是查看结果的

    # celery.py
    
    from celery import Celery
    broker = 'redis://:12345@127.0.0.1:6379/1'
    backend = 'redis://:12345@127.0.0.1:6379/2'
    # c1是实例化产生的celery的名字,因为会存在多个celery
    app = Celery('c1', broker=broker, backend=backend,
                 # 包含一些2个任务文件,去相应的py文件找任务,对多个任务进行分类
                 include=[
                     'celery_task.task_1',
                     'celery_task.task_2',
                 ])
    
    
    # celery提供一些配置,具体可查看官方文档
    # app.conf.timezone = 'Asia/Shanghai'

    在send_task.py种右键运行,提交任务到worker(这里打印了提交的2个任务的id)

    # task_1.py
    from celery_task.celery import app
    
    @app.task
    def add1(x, y):
        import time
        time.sleep(0.5)
        return x + y
    
    # task_2.py
    from celery_task.celery import app
    
    @app.task
    def add2(x, y):
        import time
        time.sleep(1)
        return x * y
    # send_task.py
    from celery_task.task_1 import add1
    from celery_task.task_2 import add2
    
    
    res1 = add1.delay(3, 8)
    print(res1)   # 16e847f3-fc14-4391-89e2-e2b3546872cf
    
    res2 = add2.delay(4, 9)
    print(res2)   # 858c0ae5-8516-4473-8be5-7501fb856ff4

    启动worker,celery_task是包的名字
    celery worker -A celery_task -l info -P eventlet

    然后将打印的2个id在check_result.py中进行查询结果

    # check_reslut.py
    from celery.result import AsyncResult
    from celery_task.celery import app
    
    for i in ['16e847f3-fc14-4391-89e2-e2b3546872cf', '858c0ae5-8516-4473-8be5-7501fb856ff4']:
        async = AsyncResult(id=i, app=app)    
        if async.successful():
            result = async.get()
            print(result)
        
        elif async.failed():
            print('执行失败')
        
        elif async.status == 'PENDING':
            print('任务等待中')
        
        elif async.status == 'RETRY':
            print('任务异常后重试')
        
        elif async.status == 'STARTED':
            print('任务正在执行')

     

    celery的定时任务

    方式一:执行时间在年月日时分秒

    在提交任务的地方修改:

    # send_task.py
    
    from celery_task.task_1 import add1
    from celery_task.task_2 import add2
    # 执行定时任务,3s以后执行add1、add2任务
    from datetime import datetime
    # 设置任务执行时间2019年7月12日21点45分12秒
    v1 = datetime(2019, 7, 12, 21, 48, 12)
    print(v1)  # 2019-07-12 21:45:12
    # 将v1时间转成utc时间
    v2 = datetime.utcfromtimestamp(v1.timestamp())
    print(v2)  # 2019-07-12 13:45:12
    # 取出要执行任务的时间对象,调用apply_async方法,args是任务函数传的参数,eta是执行的时间
    result1 = add1.apply_async(args=[3, 8], eta=v2)
    result2 = add2.apply_async(args=[4, 9], eta=v2)
    print(result1.id)
    print(result2.id)

    方式二:通过延迟执行的时间算出执行的具体utc时间,与方式一基本相同

    在提交任务的地方修改:

    # send_task.py
    
    # 方式二:实际上和方法一类似,多了一个延迟时间,也就是用现在时间和推迟执行的时间计算出任务执行的最终utc时间
    # 然后也是调用apply_async方法。
    from datetime import datetime
    ctime = datetime.now()
    # 默认使用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    # 使用timedelta模块,拿到10秒后的时间对象,这里参数可以传秒、毫秒、微秒、分、小时、周、天
    time_delay = timedelta(seconds=10)
    # 得到任务运行时间:
    task_time = utc_ctime + time_delay
    result1 = add1.apply_async(args=[3, 8], eta=task_time)
    result2 = add2.apply_async(args=[4, 9], eta=task_time)
    print(result1.id)
    print(result2.id)

    celery的计划任务

    计划任务需要在celery.py中添加代码,然后需要beat一下,才能将计划开启

    # celery.py中
    
    from celery import Celery
    broker = 'redis://:12345@127.0.0.1:6379/1'
    backend = 'redis://:12345@127.0.0.1:6379/2'
    # c1是实例化产生的celery的名字,因为会存在多个celery
    app = Celery('c1', broker=broker, backend=backend,
                 # 包含一些2个任务文件,去相应的py文件找任务,对多个任务进行分类
                 include=[
                     'celery_task.task_1',
                     'celery_task.task_2',
                     'celery_task.task_3',
                 ])
    
    
    # celery提供一些配置,具体可查看官方文档
    # app.conf.timezone = "Asia/Shanghai"
    # app.conf.enable_utc = True
    
    
    # 计划任务
    from datetime import timedelta
    from celery.schedules import crontab
    app.conf.beat_schedule = {
        'submit_every_2_seconds': {
            # 计划的任务执行函数
            'task': 'celery_task.task_1.add1',
            # 每个2秒执行一次
            'schedule': timedelta(seconds=2),
            # 传递的任务函数参数
            'args': (3, 9)
        },
        'submit_every_3_seconds': {
                # 计划的任务执行函数
                'task': 'celery_task.task_2.add2',
                # 每个3秒执行一次
                'schedule': timedelta(seconds=3),
                # 传递的任务函数参数
                'args': (4, 7)
        },
        'submit_in_fix_datetime': {
            'task': 'celery_task.task_3.add3',
            # 比如每年的7月13日10点53分执行
            # 注意:默认使用utc时间,当前的时间中的小时必须要-8个小时才会到点提交
            'schedule': crontab(minute=53, hour=2, day_of_month=13, month_of_year=7),
            
            '''
            # 如果不想-8,可以先设置时区,再按正常时间设置
            app.conf.timezone = "Asia/Shanghai"
            app.conf.enable_utc = True
            '''
            'args': ('Hello World',)
        }
    
    }
    
    # 上面写完后,需要起一个进程,启动计划任务
    # celery beat -A celery_task -l info
    
    # 启动worker:
    # celery worker -A celery_task -l info -P eventlet

     Django中使用celery

    django-celery:由于djang-celery模块对版本的要求过于严格,而且容易出现很多bug,所以不建议使用

    直接使用celery多任务结构的,将celery多任务结构的代码文件夹celery_task拷贝到Django项目中,然后在视图函数中进行任务提交、然后进行结构查看。(启动项目时候记得将worker启动起来,注意启动路径要跟你拷贝的celery_task文件同级)

    注意:当我们在Django项目中使用celery,在celery的任务函数中不能直接调用django的环境(比如orm方法查询数据库),需要添加代码调用Django环境

    在Python脚本中调用Django环境

    import os
    # 加载Django环境,bbs是所在的Django项目名称
        os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'bbs.settings')
        # 引入Django模块
        import django
        # 初始化Django环境
        django.setup()
        # 从app当中导入models
        from app01 import models
        # 调用操作,拿到数据库中的所有Book数据对象
        books = models.Books.objects.all()
  • 相关阅读:
    一步一步使用Ext JS MVC与Asp.Net MVC 3开发简单的CMS后台管理系统之用户管理(1)
    一步一步使用Ext JS MVC与Asp.Net MVC 3开发简单的CMS后台管理系统之创建Viewport(2)
    一步一步使用Ext JS MVC与Asp.Net MVC 3开发简单的CMS后台管理系统之用户管理(2)
    一步一步使用Ext JS MVC与Asp.Net MVC 3开发简单的CMS后台管理系统之完成登录功能
    一步一步使用Ext JS MVC与Asp.Net MVC 3开发简单的CMS后台管理系统之登录窗口调试
    一步一步使用Ext JS MVC与Asp.Net MVC 3开发简单的CMS后台管理系统之创建Viewport(1)
    一步一步使用Ext JS MVC与Asp.Net MVC 3开发简单的CMS后台管理系统之创建输出验证码图片的控制器
    一步一步使用Ext JS MVC与Asp.Net MVC 3开发简单的CMS后台管理系统之调整首页显示
    一步一步使用Ext JS MVC与Asp.Net MVC 3开发简单的CMS后台管理系统之登录窗口
    一步一步使用Ext JS MVC与Asp.Net MVC 3开发简单的CMS后台管理系统之用户管理(3)
  • 原文地址:https://www.cnblogs.com/suguangti/p/11178128.html
Copyright © 2011-2022 走看看