zoukankan      html  css  js  c++  java
  • celery——使用

     一、celery基本使用

    1 写一个py文件 (s1.py--->任务写好后在此运行会拿到任务id)
        import celery
        # 消息中间件(redis)
        broker='redis://127.0.0.1:6379/1'     # 1 表示使用redis 1 这个db
        # 结果存储(redis)
        backend='redis://127.0.0.1:6379/2'   # 2 表示使用redis 2 这个db
        # 实例化得到对象,指定中间件和结果存储
        app=celery.Celery('test',broker=broker,backend=backend)
        @app.task
        def add(a,b):
            return a+b
        @app.task
        def mul(a,b):
            return a*b
        
    2 提交任务(在其它文件中)
        from t_celery import add, mul
        res=add.delay(100,4) 
        print(res)  # id号
        
    3 启动worker(启动work,使用命令启动,需要在提交任务的位置下启动)
        # 非windows平台:celery worker -A t_celery -l info
        # windows装eventlet:celery worker -A t_celery -l info -P eventlet
    #注意:因celery在windows下适用性不好,提交任务最好先安装eventlet模块,另外5.0及其以上版本无法使用-A,所以安装celery时候要安装4.x版本
    4 查看执行结果(在此文件中输入任务id,运行此文件就可以查看到任务运行的状态了) from t_celery import app from celery.result import AsyncResult # 关键字,变量不能定义为关键字 id = '5331c70b-1b51-4a15-aa17-2fa0f7952c00' if __name__ == '__main__': res = AsyncResult(id=id, app=app) if res.successful(): result = res.get() print(result) elif res.failed(): print('任务失败') elif res.status == 'PENDING': print('任务等待中被执行') elif res.status == 'RETRY': print('任务异常后正在重试') elif res.status == 'STARTED': print('任务已经开始被执行')

    二、celery多任务结构

    1.包架构封装(多任务结构)

    project
        ├── celery_task      # celery包
        │   ├── __init__.py # 包文件
        │   ├── celery.py   # celery连接和配置相关文件,且名字必须叫celery.py
        │   └── tasks.py    # 所有任务函数
        ├── add_task.py      # 添加任务
        └── get_result.py   # 获取结果

    2.使用

    #实际多任务结构包架构
    package_celery:     # 项目名
        celery_task     # celery包名
            __init__.py 
            celery.py   # celery 的app,必须叫celery
            order_task.py # 任务
            user_task.py  # 任务
        result.py         # 结果查询
        submit_task.py    # 提交任务
        
        
    # 运行worker(要 cd package_celery 在 package_celery目录下执行,但是celery的app名字必须叫celery,否则找不到)
        celery worker -A celery_task -l info -P eventlet
    # 提交任务
        from celery_task import order_task,user_task
        # 提交一个给用户发短信的任务
        res=user_task.send_sms.delay('18723454566')
        print(res)
        # 提交一个取消订单任务
        res=order_task.cancel_order.delay()
        print(res)

    3.代码

    celery.py

    import celery
    
    broker='redis://127.0.0.1:6379/1'     # 1 表示使用redis 1 这个db
    
    backend='redis://127.0.0.1:6379/2'   # 2 表示使用redis 2 这个db
    
    app=celery.Celery('test',broker=broker,backend=backend,
                      include=['celery_task.order_task','celery_task.user_task']
                      )

    order_task.py

    from .celery import app
    
    @app.task
    def process_order(a, b):
        print(a)
        print(b)
        return '订单处理完了'
    @app.task
    def cancel_order():
        import random
        res = random.choice([1, 0])
        if res == 0:
            print('订单状态改了,取消订单了')
            return True
        else:
            print('订单取消失败')
            return False

    user_task.py

    from .celery import app
    
    @app.task
    def send_sms(phone):
        import time
        time.sleep(1)
        print('%s短信发送成功'%phone)
        return '%s短信发送成功'%phone

    result.py

    from celery_task.celery import app
    from celery.result import AsyncResult
    
    # 关键字,变量不能定义为关键字
    
    #发短信任务: bbfbb809-71fd-4dd7-bb3b-e84590840819
    # 取消订单任务:4595fd2f-1f8d-4546-8176-8a0627803584
    id = '4595fd2f-1f8d-4546-8176-8a0627803584'
    if __name__ == '__main__':
        res = AsyncResult(id=id, app=app)
        if res.successful():
            result = res.get()
            print(result)
        elif res.failed():
            print('任务失败')
        elif res.status == 'PENDING':
            print('任务等待中被执行')
        elif res.status == 'RETRY':
            print('任务异常后正在重试')
        elif res.status == 'STARTED':
            print('任务已经开始被执行')

    submit_task.py   

    注意:在celery_task层输入执行任务命令,但是app名必须叫celery,不然找不到

    from celery_task import order_task,user_task
    
    
    # 提交一个给用户发短信的任务
    # res=user_task.send_sms.delay('18723454566')
    # print(res)
    # # 提交一个取消订单任务
    # res=order_task.cancel_order.delay()
    # print(res)

    3.真实应用场景

    # 真实应用场景
        -秒杀系统
            -不能秒杀,使用锁(mysql悲观锁,乐观锁),redis锁
            -提高并发量---》把同步做成异步---》使用celery
                -前端点击秒杀按钮,向后端发送秒杀请求---》同步操作
                    -同步操作
                        -请求来到后端,判断数量是否够,如果够,要生成订单(mysql),订单状态是待支付状态                        
                        -请求返回,告诉前端,秒杀成功
                    -异步操作(提交项目并发量)
                        -请求来到后端,提交一个celery任务---》celery任务异步的执行判断数量是否够,如果够,要生成订单(mysql)
                        -秒杀是否成功的结果还没有,直接返回了(返回任务id)
                        -前端启动一个定时任务,每隔5s,向后台发送一个查询请求,查询秒杀任务是否执行完成(带着任务id查)
                        -如果是未执行状态,或者执行中---》返回给前端,前端不处理,定时任务继续执行
                        -又隔了5s,发送查询,查询到秒杀成功的结果,返回给前端,秒杀成功

     三、高级使用之延时任务

    # celery执行延时任务
    ## 第一种方式:2021年1月7日17点3分12秒发送短信
    # from datetime import datetime
    # # # eta:延迟多长时间执行,eta需要传时间对象,并且是utc时间
    # v1 = datetime(2021, 1, 7, 17, 3, 12)
    # print(v1) #2021-01-07 17:01:56
    # v2 = datetime.utcfromtimestamp(v1.timestamp())
    # print(v2) #2021-01-07 09:01:56
    # res=user_task.send_sms.apply_async(args=['172****4332',],eta=v2)
    
    
    ## 第二种方式:隔几秒后执行
    from datetime import datetime
    from datetime import timedelta
    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    
    time_delay = timedelta(seconds=10)
    task_time = utc_ctime + time_delay
    print(task_time)
    res=user_task.send_sms.apply_async(args=['172****4332',],eta=task_time)

    四、高级使用之定时任务

    # 在celery.py中配置
    
    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.enable_utc = False
    
    # 任务的定时配置
    from datetime import timedelta
    from celery.schedules import crontab
    
    app.conf.beat_schedule = {
        'send-msg':{
            'task': 'celery_task.user_task.send_sms',
            # 'schedule': timedelta(hours=24*10),
            # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
            'schedule': crontab(hour=8, day_of_month=1),  # 每月一号早八点
            'args': ('18964352112',),
        }
    }
    
    
    # 启动beat,负责每隔3s提交一个任务
    celery beat -A celery_task -l info
    # 启动worker
    celery worker -A celery_task -l info -P eventlet
  • 相关阅读:
    获取class
    domReady
    JS原型
    JavaScript继承
    LeetCode 46. Permutations
    LinkCode 第k个排列
    接口测试基础——第5篇xlrd模块
    接口测试基础——第4篇logging模块
    接口测试基础——第3篇smtplib发送带图片的邮件
    接口测试基础——第2篇smtplib发送带附件的邮件
  • 原文地址:https://www.cnblogs.com/guojieying/p/14284200.html
Copyright © 2011-2022 走看看