zoukankan      html  css  js  c++  java
  • celery


    Celery

    1.什么是Celery

    Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统

    专门处理实时处理的异步任务队列

    同时也支持任务调度

    2.Celery架构

    Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成

    消息中间件

    Celery本身是不提供消息服务,但是可以方便的和第三方提供的消息中间价集成,包括RabbitMQ,Redis等

    任务执行单元

    Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

    任务结果存储

    Task result store用来存储Worker执行的任务结果,Celery支持以不同方式存储任务的结果,包括AMQP,Redis

    版本支持情况

    Celery version 4.0 runs on
            Python ❨2.7, 3.4, 3.5❩
            PyPy ❨5.4, 5.5❩
        This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.
    
        If you’re running an older version of Python, you need to be running an older version of Celery:
    
            Python 2.6: Celery series 3.1 or earlier.
            Python 2.5: Celery series 3.0 or earlier.
            Python 2.4 was Celery series 2.2 or earlier.
    
        Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

    3.使用场景

    异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信、邮件、消息推送、音视频处理等等

    定时任务:定时执行某件事情,比如每天数据统计

    4.Celery的安装配置

    pip install celery

    消息中间件:RabbitMQ/Redis

    app = Celery('任务名',backend='xxx',broker='xxx')

    5.Celery执行异步任务

    基本使用

    创建项目celerytest

    创建py文件:celery_app_task_.py

    import celery
    import time
    broker = 'redis://127.0.0.1:6379/1' 
    #消息中间件
    #redis不加密,放到第1个库中
    # broker = 'reids://:123@127.0.0.1:6379/1'  redis加密,密码是123
    
    #结果存储
    backend = 'reids://127.0.0.1:6379/2'  # redis不加密,放到第二个库中
    
    # 实例化产生一个Celery对象,一定要指定名字
    cel = celery.Celery('test',backend=backend,broker=broker)
    
    #任务其实就是一个函数
    #需要用一个装饰器装饰,表示该任务是被celery管理的,并且可以用celery执行的
    @cel.task
    def add(x,y):
        time.sleep(2)
        return x+y

    提交任务的py文件:add_task.py

    import celery_task_s1
    res = celery_task_s1.add.delay(3,4)
    print(res)
    # res就是任务的id号

    右击运行提交任务,

    启动worker,使用命令执行:celery worker -A celery_task_s1 -l info

    注意:windows下:celery worker -A celery_task_s1 -l info -P eventlet

    存储任务结果的py文件:celery_result.py

    from celery.result import AsyncResult
    from celery_task_s1 import cel
    # 根据id去查询它的执行结果
    async = AsyncResult(id="a5ea035f-0cc3-44ba-b334-f5d7c7ce681d", app=cel)
    if async.successful():
        #取出它return的值
        result = async.get()
        print(result)
        # result.forget() # 将结果删除
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

    执行add_task.py,添加任务,并获取任务id

    执行celery_result.py,检查任务状态并获取结果

    执行定时任务,3s钟以后执行add任务

    第一种获取时间的方法

    from datetime import datetime
    v1 = datetime(2019,7,12,16,15,56)
    print(v1)
    #默认用UTC时间
    v2 = datetime.utcfromtimestamp(v1.timestamp())
    print(v2)
    result = celery_task_s1.add.apply_async(args=[1,3],eta=v2)
    print(result,id)
    

    第二种获取时间的方法

    from datetime import timedelta
    ctime = datetime.now()
    #默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    time_delay = timedelta(seconds=3)
    task_time = utc_ctime + time_delay
    result = celery_task_s1.add.apply_async(args=[2,8],eta=task_time)

    多任务结构

    # 项目结构
    pro_cel
        ├── celery_task# celery相关文件夹
        │   ├── celery.py   # celery连接和配置相关文件,必须叫这个名字
        │   └── tasks1.py    #  所有任务函数
        │   └── tasks2.py    #  所有任务函数
        ├── check_result.py # 检查结果
        └── add_task.py    # 触发任务

    celery.py文件

    #必须叫celery,生成celery对象
    
    from celery import Celery
    from datetime import timedelta
    
    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    cel = Celery('test',broker=broker,backend=backend,
                 include=[
                     'celery_task.order_task',
                     'celery_task.user_task'
                 ])
    
    #时区
    cel.conf.timezone = 'Asia/shanghai'
    # 是否使用UTC
    cel.conf.enable_utc = False

    order_task.py

    from celery_task.celery import cel
    
    @cel.task
    def order_add(x,y):
        import time
        time.sleep(2)
        return x+y
    

    user_task.py

    from celery_task.celery import cel
    @cel.task
    def user_add(x,y):
        import time
        time.sleep(2)
        return x+y
    

    add_task.py

    from celery_task.order_task import order_add
    from celery_task.user_task import user_add
    
    
    res=order_add.delay(5,6)
    print(res.id)
    res=user_add.delay(10,60)
    print(res.id)
    

    celery_result.py

    from celery.result import AsyncResult
    from celery_task.celery import cel
    
    async = AsyncResult(id="c8815fd0-c126-4fed-b908-805974761381", app=cel)
    
    if async.successful():
        #取出它return的值
        result = async.get()
        print(result)
        # result.forget() # 将结果删除
        # async.revoke(terminate=True)  # 无论现在是什么时候,都要终止
        # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。  
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
    

    添加任务(执行send_task.py),开启work:celery worker -A celery_task -l info -P eventlet,检查任务执行结果(执行check_result.py)

    定时任务:多任务结构中celery.py修改如下

    from datetime import timedelta
    from celery import Celery
    from celery.schedules import crontab
    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    cel = Celery('test',broker=broker,backend=backend,
                 include=[
                     'celery_task.order_task',
                     'celery_task.user_task'
                 ])
    
    #时区
    cel.conf.timezone = 'Asia/shanghai'
    # 是否使用UTC
    cel.conf.enable_utc = False
    
    cel.conf.beat_schedule = {
        # 名字随意命名
        'add_every_10_seconds':{
            'test':'celery_task.order_task.order_add',
            # 每隔2秒执行一次
            # 'sehedule':1.0,
            # 'schedule':crontab(minute="*/1"),
            'schedule':timedelta(seconds=2),
            # 传递参数
            'args':(5,6)
        },
        # 'add-every-12-seconds': {
        #     'task': 'celery_task.tasks1.test_celery',
        #     每年4月11号,8点42分执行
        #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
        #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
        #     'args': (16, 16)
        # },
    }
    

    启动一个beat:celery beat -A celery_task -l info

    启动work执行:celery worker -A celery_task -l info -P eventlet

    6.django中使用celery

    多任务结构的celery_task文件夹直接拷到根目录下

    views.py中

    from django.shortcuts import render,HttpResponse
    
    from celery_task.user_task import user_add
    
    def index(request):
        result = user_add.delay(8,9)
        return HttpResponse(result.id)
    
    def check_result(request):
        res = request.GET.get('id')
        from celery.result import AsynResult
        from celery_task.celery import cel
        async = AsynResult(id=res,app=cel)
        if async.sucessful():
            result = async.get()
            print(result)
            return HttpResponse('ok')
    

    注意:上述做完后别忘了启动worker,再配置路由就可以了

    强调:在celery的任务函数中不能直接调用django的环境(也就是不能直接操作数据库),需要手动添加

    os.environ.setdefault('DJANGO_SETTINGS_MODEL','untitled15.settings')

    import django

    django.setup()

    补充

    pipreq:生成项目依赖的第三方包

    项目依赖:pip3 install pipreqs

    生成依赖文件:pipreqs ./

    安装依赖文件:pips install -r requirements.txt

    方法和函数

    他们是有区别的

    方法:比如一个类的对象去调用类内部写的函数就是方法

    函数:类直接调用内部的函数就是函数

    偏函数:先给函数传一个值进去

    from functions import partial
    def test(x,y,z):
        return x+y+z
    
    tsst = partial(test,1)
    print(test(2,3))  # 先传一个值进去,之后再传只需传两个就可以了
    

    轮询和长轮询

    轮询:不停的发请求

    长轮询:hang一会儿再发请求

  • 相关阅读:
    6.Mysql事务索引备份视图
    5.MySQL函数
    4.Mysql:使用DQL查询数据
    JavaScript的柯里化和反柯里化
    主流浏览器的内核以及Hack写法
    JavaScript的arguments对象
    JavaScript自定义事件监听
    星空雅梦
    星空雅梦
    星空雅梦
  • 原文地址:https://www.cnblogs.com/Mr-bear/p/11186189.html
Copyright © 2011-2022 走看看