zoukankan      html  css  js  c++  java
  • celery的基础使用等相关内容-125

    1 celery基本使用

    1 写一个py文件
       import celery
       # 消息中间件(redis)
       broker='redis://127.0.0.1:6379/1'     # 1 表示使用redis 1 这个db
       # 结果存储(redis)
       backend='redis://127.0.0.1:6379/2'   # 2 表示使用redis 2 这个db
       # 实例化得到对象,指定中间件和结果存储
       app=celery.Celery('test',broker=broker,backend=backend)
       @app.task
       def add(a,b):
           return a+b
       @app.task
       def mul(a,b):
           return a*b
       
    2 提交任务(在其它文件中)
       from t_celery import add, mul
       res=add.delay(100,4)
       print(res)  # id号
       
    3 启动worker
    # 非windows平台:celery worker -A t_celery -l info
       # windows装eventlet:celery worker -A t_celery -l info -P eventlet
       
    4 查看执行结果
    from t_celery import app
       from celery.result import AsyncResult
       # 关键字,变量不能定义为关键字
       id = '5331c70b-1b51-4a15-aa17-2fa0f7952c00'
       if __name__ == '__main__':
           res = AsyncResult(id=id, app=app)
           if res.successful():
               result = res.get()
               print(result)
           elif res.failed():
               print('任务失败')
           elif res.status == 'PENDING':
               print('任务等待中被执行')
           elif res.status == 'RETRY':
               print('任务异常后正在重试')
           elif res.status == 'STARTED':
               print('任务已经开始被执行')

     

    2 celery多任务结构

    package_celery:     # 项目名
       celery_task     # celery包名
           __init__.py
           celery.py   # celery 的app,必须叫celery
           order_task.py # 任务
           user_task.py  # 任务
       result.py         # 结果查询
       submit_tast.py    # 提交任务
       
       
    # 运行worker(在package_celery目录下执行)
    celery worker -A celery_task -l info -P eventlet
    # 提交任务
       from celery_task import order_task,user_task
       # 提交一个给用户发短信的任务
       res=user_task.send_sms.delay('18723454566')
       print(res)
       # 提交一个取消订单任务
       res=order_task.cancel_order.delay()
       print(res)

       
    # 真实应用场景
    -秒杀系统
      -不能秒超,使用锁(mysql悲观锁,乐观锁),redis锁
           -提高并发量---》把同步做成异步---》使用celery
          -前端点击秒杀按钮,向后端发送秒杀请求---》同步操作
              -同步操作
                       -请求来到后端,判断数量是否够,如果够,要生成订单(mysql),订单状态是待支付状
                       -请求返回,告诉前端,秒杀成功
                   -异步操作
                  -请求来到后端,提交一个celery任务---》celery任务异步的执行判断数量是否够,如 果够,要生成订单(mysql)
                       -秒杀是否成功的结果还没有,直接返回了(返回任务id)
                       -前端启动一个定时任务,每隔5s,向后台发送一个查询请求,查询秒杀任务是否执行完                     成(带着任务id查)
                       -如果是未执行状态,或者执行中---》返回给前端,前端不处理,定时任务继续执行
                       -又隔了5s,发送查询,查询到秒杀成功的结果,返回给前端,秒杀成功
         

     

     

    3 高级使用之延时任务

    # celery执行延时任务
    ## 第一种方式:2021年1月7日17点3分12秒发送短信
    # from datetime import datetime
    # # # eta:延迟多长时间执行,eta需要传时间对象,并且是utc时间
    # v1 = datetime(2021, 1, 7, 17, 3, 12)
    # print(v1)
    # v2 = datetime.utcfromtimestamp(v1.timestamp())
    # print(v2)
    # res=user_task.send_sms.apply_async(args=['18977654332',],eta=v2)


    ## 第二种方式:隔几秒后执行
    from datetime import datetime
    from datetime import timedelta
    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())

    time_delay = timedelta(seconds=10)
    task_time = utc_ctime + time_delay
    print(task_time)
    res=user_task.send_sms.apply_async(args=['18977654332',],eta=task_time)

     

    4 高级使用之定时任务

    # 在celery.py中配置

    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.enable_utc = False

    # 任务的定时配置
    from datetime import timedelta
    from celery.schedules import crontab

    app.conf.beat_schedule = {
       'send-msg':{
           'task': 'celery_task.user_task.send_sms',
           # 'schedule': timedelta(hours=24*10),
           # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
           'schedule': crontab(hour=8, day_of_month=1),  # 每月一号早八点
           'args': ('18964352112',),
      }
    }


    # 启动beat,负责每隔3s提交一个任务
    celery beat -A celery_task -l info
    # 启动worker
    celery worker -A celery_task -l info -P eventlet

     

    5 django中使用celery

    1 celery是独立的,跟框架没有关系
    2 django-celery第三方模块,兼容性不好,咱们不采用,咱们使用通用方式
    3 目录
    celery_task
      __init__.py
           celery.py
           home_task.py
           order_task.py
           user_task.py
    luffyapi

    5.1 home_task.py

    from celery_task.celery import app

    @app.task
    def update_banner():
      from django.core.cache import cache
      from django.conf import settings
      from home import models
      from home import serializer
      banners=models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-order')[:settings.BANNER_SIZE]

      ser = serializer.BannerModelSerializer(instance=banners,many=True)
      banner_data=ser.data

      # 拿不到request对象,所以头像的连接base_url要自己组装
      for banner in banner_data:
          banner['img'] = 'http://127.0.0.1:8000%s' % banner['img']


      cache.set('banner_data',banner_data)

      return True

    5.2 celery.py

    import celery

    import os
    # 执行django配置文件,环境变量加入
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")




    broker='redis://127.0.0.1:6379/1'     # 1 表示使用redis 1 这个db

    backend='redis://127.0.0.1:6379/2'   # 2 表示使用redis 2 这个db

    app=celery.Celery('test',broker=broker,backend=backend,
                     include=['celery_task.order_task','celery_task.user_task','celery_task.home_task']
                    )



    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.enable_utc = False

    # 任务的定时配置
    from datetime import timedelta
    from celery.schedules import crontab

    app.conf.beat_schedule = {

       # 'send-msg':{
       #     'task': 'celery_task.user_task.send_sms',
       #     # 'schedule': timedelta(hours=24*10),
       #     # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
       #     'schedule': crontab(hour=8, day_of_month=1), # 每月一号早八点
       #     'args': ('18964352112',),
       # }
       'update-banner':{
           'task': 'celery_task.home_task.update_banner',
           'schedule': timedelta(seconds=10),
           'args': (),
      }
    }

    views.py

    from celery_task import user_task
    from celery_task.celery import app
    from celery.result import AsyncResult


    def test_celery(request):
       res_id = request.GET.get('id')
       if res_id:
           res = AsyncResult(id=res_id, app=app)
           if res.successful():
               result = res.get()
               print(result)
               return HttpResponse('执行完成了,结果是:%s' % result)

       res = user_task.send_sms.delay('18276345221')
       return HttpResponse('任务号是:%s' % str(res))

    6 首页轮播图定时更新

    1 把首页轮播图接口改成,先去缓存中取,缓存中没有,再去数据库查
    2 首页轮播图加入了缓存
    3 以后,如果你的接口,请求慢,第一反应就是先使用缓存


    4 使用celery定时更新缓存
    class BannerView(GenericViewSet, ListModelMixin):
       queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-order')[:settings.BANNER_SIZE]
       serializer_class = serializer.BannerModelSerializer

       # 改成,先从缓存中取,缓存中如果有,直接返回,没有,再去数据库查

       def list(self, request, *args, **kwargs):
           # 如果缓存中有值,直接取出来返回,速度很快
           banner_data = cache.get('banner_data')
           if banner_data:

               print('走了缓存')
               return Response(data=banner_data)

           # 如果缓存中没有,再走数据,查出来,放到缓存中
           res = super().list(request, *args, **kwargs)
           # 把首页轮播图数据放到缓存中
           cache.set('banner_data', res.data)
           print('没走缓存')
           return res

     

     

  • 相关阅读:
    七牛云李意扬:如何收集 Go 实时覆盖率丨ECUG Meetup 回顾
    OpenTelemetry 微服务链路追踪
    空接口
    安全规则集合
    采用最快回应
    Golang单元测试实战
    源码 kratos 配置热加载分析
    烟花 光影
    控制Repeater显示列数
    基本代码安全知识
  • 原文地址:https://www.cnblogs.com/usherwang/p/14248231.html
Copyright © 2011-2022 走看看