zoukankan      html  css  js  c++  java
  • celery

    一、celery简介,架构

    1 celery:芹菜(跟芹菜没有任何关系)
    2 python中的一个分布式任务框架
        执行异步任务(对立:同步任务):解决耗时任务,将耗时操作任务提交给celery去异步执行,比如发送短信/邮箱、消息推送、音频处理等等
        执行延时任务(5分钟后干一件事):解决延迟任务
        执行定时任务:每天,隔几分钟,干什么事:解决周期任务,比如每天统计数据
    
    3 解释
    Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.(不怎么支持windos)
    
    4 celery特点(了解)
        
       可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
        celery服务为其他项目提供异步解决任务的需求
    5 Celery架构
        Celery的架构由三部分组成,消息中间件(message,broker)、任务执行单元(worker)和任务执行结果存储(task result store)组成
    
    消息中间件
    Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括RabbiMQ,Redis等等
    
    任务执行单元
    
    worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中

    #任务结果存储
    Task result store用来存储Worker执行的任务的结果,Celery支持以不同的方式存储任务的结果,Celery支持以不同的
    方式存储任务的结果,包括AMQP,redis等





      

    二、celery基本使用

    1 写一个py文件
        import celery
        #消息中间件
        broker='redis://127.0.0.1:6379/1'  #1 表示使用redis 1 这个db
        #结果存储(redis)
        backend='redis://127.0.0.1:6379/2' #2 表示使用redis 2 这个db
        #实例化得到的对象,指定中间件和结果存储
        app = celery.Celery('test',broker=broker,backend=backend)
        @app.task
        def add(a,b):
            return a+b
        @app.task
        def mul(a,b):
            return a*b
    2 提交任务(在其他文件中)
    from t_celery import add,mull
    res=add.delay(100,4) #delay表示异步
     print(res) #id号

    3 启动worker
      #非window平台:celery worker -A t_celery -l info #info代表日志级别
      # windows装eventlet:celery worker -A t_celery -l info -P eventlet
    4 查看执行结果
    from t_celery import app
    from celery.result import AsyncResult
    #关键字,变量不能定义为关键字
    id='5331c70b-1b51-4a15-aa17-2fa0f7952c00'
    if __name__ == '__main__':
      res=AsyncResult(id=id,app=app)
      if res.successful():
        result = res.get()
        print(result)
      elif res.failed():
        print('任务失败')
      elif res.status == 'PENDING':
        print('任务等待中被执行')
      elif res.status == 'RETRY':
        print('任务异常后正在重试')
      elif res.status == 'STARTED':
        print('任务以及开始被执行')
      








    三、celery多任务结构

    package_celery:     # 项目名
        celery_task     # celery包名 (启动celery worker的路径)
            __init__.py 
            celery.py   # celery 的app,必须叫celery
            order_task.py # 任务
            user_task.py  # 任务
        result.py         # 结果查询
        submit_tast.py    # 提交任务

    #运行worker(在package_celery目录下执行)
    celery worker -A celery_task -l info -P eventlet
    #提交任务
    from celery_task import order_task,user_task
    #提交一个给用户发短信的任务
    res=user_task.send_sms.delay('1872465747')
    print(res)
    #提交一个取消订单任务
    res=order_task.cancel_order.delay()
    print(res)

    # 真实应用场景
    -秒杀系统
    -不能秒超,使用锁(mysql悲观锁,乐观锁),redis锁
    -提高并发量---》把同步做成异步---》使用celery
    -前端点击秒杀按钮,向后端发送秒杀请求---》同步操作
    -同步操作
    -请求来到后端,判断数量是否够,如果够,要生成订单(mysql),订单状态是待支付状 态
    -请求返回,告诉前端,秒杀成功
    -异步操作
    -请求来到后端,提交一个celery任务---》celery任务异步的执行判断数量是否够,如 果够,要生成订单(mysql)
    -秒杀是否成功的结果还没有,直接返回了(返回任务id)
    -前端启动一个定时任务,每隔5s,向后台发送一个查询请求,查询秒杀任务是否执行完 成(带着任务id查)
    -如果是未执行状态,或者执行中---》返回给前端,前端不处理,定时任务继续执行
    -又隔了5s,发送查询,查询到秒杀成功的结果,返回给前端,秒杀成功



    注意:在用celery多任务结构的时候,启动celery worker 时文件的路径,是celery包名,不是celery文件夹

    四、高级使用之延时任务

    #celery执行延时任务
    
    #第一种方式:2021年1月7日17点3分12秒发送短信
    
    #from datetime import datetime
    
    #eta:延迟多长时间执行,eta需要传时间对象,并且是utc时间
    #v1 = datetime(2021,1,7,17,3,12)
    print(v1)
    v2 = datetime.utcfromtimestamp(v1.timestamp())
    print(v2)
    #args是真正的函数传的参数,eta传的是时间
    res=user_task.send_sms.apply_async( args=['1879531248',],eta=v2)
    
    ##第二种方式:隔几秒后执行(用的多)
    from datetime import datetime from datetime import timedelta ctime = datetime.now() #默认用utc时间 utc_time = datetime.utcfromtimestamp(ctime.timestamp()) time_delay = timedelta(seconds=10)

    task_time
    = utc_time + time_delay #(延迟的时间) print(task_time)
    #args(是函数真正传的参数,) eta(是延迟的时间) res
    =user_task.send_sms.apply_async(args=['18945456456',],eta=task_time)

     五、定时任务

    # 在celery.py中配置
    
    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.enable_utc = False
    
    # 任务的定时配置
    from datetime import timedelta
    from celery.schedules import crontab
    
    app.conf.beat_schedule = {
      #要执行几个就配置几个 'send-msg':{ 'task': 'celery_task.user_task.send_sms', # 'schedule': timedelta(hours=24*10), # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点 'schedule': crontab(hour=8, day_of_month=1), # 每月一号早八点 'args': ('18964352112',), } } # 用定时任务必须启动beat,负责每隔3s提交一个任务(自动提交) celery beat -A celery_task -l info # 启动worker celery worker -A celery_task -l info -P eventlet  

     六、django中使用celery

    1 celery是独立的,跟框架没有关系
    2 Django-celery第三方模块,兼容性不好,一般不采用
    3 目录
    celery_task
            __init__.py
            celery.py
            home_task.py
            order_task.py
            user_task.py
    luffyapi

    以后使用可以直接复制这个目录结构

    七、在路飞中具体使用

    7.1 home_task.py

    from celery_task.celery import app
    
    @app.task
    def update_banner():
        from django.core.cache import cache
        from django.conf import settings
        from home import models
        from home import serializer
        banners=models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-order')[:settings.BANNER_SIZE]
    
        ser = serializer.BannerModelSerializer(instance=banners,many=True)
        banner_data=ser.data
    
        # 拿不到request对象,所以头像的连接base_url要自己组装
        for banner in banner_data:
            banner['img'] = 'http://127.0.0.1:8000%s' % banner['img']
    
    
        cache.set('banner_data',banner_data)
    
        return True

    7.2 celery.py

    import celery
    
    import os
    # 执行django配置文件,环境变量加入
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")
    
    
    
    
    broker='redis://127.0.0.1:6379/1'     # 1 表示使用redis 1 这个db
    
    backend='redis://127.0.0.1:6379/2'   # 2 表示使用redis 2 这个db
    
    app=celery.Celery('test',broker=broker,backend=backend,
                      include=['celery_task.order_task','celery_task.user_task','celery_task.home_task']
                      )
    
    
    
    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.enable_utc = False
    
    # 任务的定时配置
    from datetime import timedelta
    from celery.schedules import crontab
    
    app.conf.beat_schedule = {
    
        # 'send-msg':{
        #     'task': 'celery_task.user_task.send_sms',
        #     # 'schedule': timedelta(hours=24*10),
        #     # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        #     'schedule': crontab(hour=8, day_of_month=1),  # 每月一号早八点
        #     'args': ('18964352112',),
        # }
        'update-banner':{
            'task': 'celery_task.home_task.update_banner',
            'schedule': timedelta(seconds=10),
            'args': (),
        }
    }

    views.py

    from celery_task import user_task
    from celery_task.celery import app
    from celery.result import AsyncResult
    
    
    def test_celery(request):
        res_id = request.GET.get('id')
        if res_id:
            res = AsyncResult(id=res_id, app=app)
            if res.successful():
                result = res.get()
                print(result)
                return HttpResponse('执行完成了,结果是:%s' % result)
    
        res = user_task.send_sms.delay('18276345221')
        return HttpResponse('任务号是:%s' % str(res))

    首页轮播图定时更新

    1 把首页轮播图接口改成,先去缓存中取,缓存中没有,再去数据库查
    2 首页轮播图加入了缓存
    3 以后,如果你的接口,请求慢,第一反应就是先使用缓存
    
    
    4 使用celery定时更新缓存

    class BannerView(GenericViewSet, ListModelMixin):
        queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-order')[:settings.BANNER_SIZE]
        serializer_class = serializer.BannerModelSerializer
    
        # 改成,先从缓存中取,缓存中如果有,直接返回,没有,再去数据库查
    
        def list(self, request, *args, **kwargs):
            # 如果缓存中有值,直接取出来返回,速度很快
            banner_data = cache.get('banner_data')
            if banner_data:
    
                print('走了缓存')
                return Response(data=banner_data)
    
            # 如果缓存中没有,再走数据,查出来,放到缓存中
            res = super().list(request, *args, **kwargs)
            # 把首页轮播图数据放到缓存中
            cache.set('banner_data', res.data)
            print('没走缓存')
            return res
  • 相关阅读:
    基于xtrabackup的PointInTime Recovery备份恢复
    使用折半查找法删除
    五月第二周
    MySQL 5.5 外键不能引用分区表主键
    MYSQL 登录漏洞,Percona Server说明
    Detectron2学习笔记 Sanny.Liu
    取客户MAP地址
    DataReader转实体<T>
    从程序员到翻译的感受
    .net中的浅拷贝和深拷贝
  • 原文地址:https://www.cnblogs.com/ltyc/p/14250931.html
Copyright © 2011-2022 走看看