zoukankan      html  css  js  c++  java
  • celery的基础使用等相关内容-125

    1 celery基本使用

    1 写一个py文件
       import celery
       # 消息中间件(redis)
       broker='redis://127.0.0.1:6379/1'     # 1 表示使用redis 1 这个db
       # 结果存储(redis)
       backend='redis://127.0.0.1:6379/2'   # 2 表示使用redis 2 这个db
       # 实例化得到对象,指定中间件和结果存储
       app=celery.Celery('test',broker=broker,backend=backend)
       @app.task
       def add(a,b):
           return a+b
       @app.task
       def mul(a,b):
           return a*b
       
    2 提交任务(在其它文件中)
       from t_celery import add, mul
       res=add.delay(100,4)
       print(res)  # id号
       
    3 启动worker
    # 非windows平台:celery worker -A t_celery -l info
       # windows装eventlet:celery worker -A t_celery -l info -P eventlet
       
    4 查看执行结果
    from t_celery import app
       from celery.result import AsyncResult
       # 关键字,变量不能定义为关键字
       id = '5331c70b-1b51-4a15-aa17-2fa0f7952c00'
       if __name__ == '__main__':
           res = AsyncResult(id=id, app=app)
           if res.successful():
               result = res.get()
               print(result)
           elif res.failed():
               print('任务失败')
           elif res.status == 'PENDING':
               print('任务等待中被执行')
           elif res.status == 'RETRY':
               print('任务异常后正在重试')
           elif res.status == 'STARTED':
               print('任务已经开始被执行')

     

    2 celery多任务结构

    package_celery:     # 项目名
       celery_task     # celery包名
           __init__.py
           celery.py   # celery 的app,必须叫celery
           order_task.py # 任务
           user_task.py  # 任务
       result.py         # 结果查询
       submit_tast.py    # 提交任务
       
       
    # 运行worker(在package_celery目录下执行)
    celery worker -A celery_task -l info -P eventlet
    # 提交任务
       from celery_task import order_task,user_task
       # 提交一个给用户发短信的任务
       res=user_task.send_sms.delay('18723454566')
       print(res)
       # 提交一个取消订单任务
       res=order_task.cancel_order.delay()
       print(res)

       
    # 真实应用场景
    -秒杀系统
      -不能秒超,使用锁(mysql悲观锁,乐观锁),redis锁
           -提高并发量---》把同步做成异步---》使用celery
          -前端点击秒杀按钮,向后端发送秒杀请求---》同步操作
              -同步操作
                       -请求来到后端,判断数量是否够,如果够,要生成订单(mysql),订单状态是待支付状
                       -请求返回,告诉前端,秒杀成功
                   -异步操作
                  -请求来到后端,提交一个celery任务---》celery任务异步的执行判断数量是否够,如 果够,要生成订单(mysql)
                       -秒杀是否成功的结果还没有,直接返回了(返回任务id)
                       -前端启动一个定时任务,每隔5s,向后台发送一个查询请求,查询秒杀任务是否执行完                     成(带着任务id查)
                       -如果是未执行状态,或者执行中---》返回给前端,前端不处理,定时任务继续执行
                       -又隔了5s,发送查询,查询到秒杀成功的结果,返回给前端,秒杀成功
         

     

     

    3 高级使用之延时任务

    # celery执行延时任务
    ## 第一种方式:2021年1月7日17点3分12秒发送短信
    # from datetime import datetime
    # # # eta:延迟多长时间执行,eta需要传时间对象,并且是utc时间
    # v1 = datetime(2021, 1, 7, 17, 3, 12)
    # print(v1)
    # v2 = datetime.utcfromtimestamp(v1.timestamp())
    # print(v2)
    # res=user_task.send_sms.apply_async(args=['18977654332',],eta=v2)


    ## 第二种方式:隔几秒后执行
    from datetime import datetime
    from datetime import timedelta
    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())

    time_delay = timedelta(seconds=10)
    task_time = utc_ctime + time_delay
    print(task_time)
    res=user_task.send_sms.apply_async(args=['18977654332',],eta=task_time)

     

    4 高级使用之定时任务

    # 在celery.py中配置

    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.enable_utc = False

    # 任务的定时配置
    from datetime import timedelta
    from celery.schedules import crontab

    app.conf.beat_schedule = {
       'send-msg':{
           'task': 'celery_task.user_task.send_sms',
           # 'schedule': timedelta(hours=24*10),
           # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
           'schedule': crontab(hour=8, day_of_month=1),  # 每月一号早八点
           'args': ('18964352112',),
      }
    }


    # 启动beat,负责每隔3s提交一个任务
    celery beat -A celery_task -l info
    # 启动worker
    celery worker -A celery_task -l info -P eventlet

     

    5 django中使用celery

    1 celery是独立的,跟框架没有关系
    2 django-celery第三方模块,兼容性不好,咱们不采用,咱们使用通用方式
    3 目录
    celery_task
      __init__.py
           celery.py
           home_task.py
           order_task.py
           user_task.py
    luffyapi

    5.1 home_task.py

    from celery_task.celery import app

    @app.task
    def update_banner():
      from django.core.cache import cache
      from django.conf import settings
      from home import models
      from home import serializer
      banners=models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-order')[:settings.BANNER_SIZE]

      ser = serializer.BannerModelSerializer(instance=banners,many=True)
      banner_data=ser.data

      # 拿不到request对象,所以头像的连接base_url要自己组装
      for banner in banner_data:
          banner['img'] = 'http://127.0.0.1:8000%s' % banner['img']


      cache.set('banner_data',banner_data)

      return True

    5.2 celery.py

    import celery

    import os
    # 执行django配置文件,环境变量加入
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")




    broker='redis://127.0.0.1:6379/1'     # 1 表示使用redis 1 这个db

    backend='redis://127.0.0.1:6379/2'   # 2 表示使用redis 2 这个db

    app=celery.Celery('test',broker=broker,backend=backend,
                     include=['celery_task.order_task','celery_task.user_task','celery_task.home_task']
                    )



    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.enable_utc = False

    # 任务的定时配置
    from datetime import timedelta
    from celery.schedules import crontab

    app.conf.beat_schedule = {

       # 'send-msg':{
       #     'task': 'celery_task.user_task.send_sms',
       #     # 'schedule': timedelta(hours=24*10),
       #     # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
       #     'schedule': crontab(hour=8, day_of_month=1), # 每月一号早八点
       #     'args': ('18964352112',),
       # }
       'update-banner':{
           'task': 'celery_task.home_task.update_banner',
           'schedule': timedelta(seconds=10),
           'args': (),
      }
    }

    views.py

    from celery_task import user_task
    from celery_task.celery import app
    from celery.result import AsyncResult


    def test_celery(request):
       res_id = request.GET.get('id')
       if res_id:
           res = AsyncResult(id=res_id, app=app)
           if res.successful():
               result = res.get()
               print(result)
               return HttpResponse('执行完成了,结果是:%s' % result)

       res = user_task.send_sms.delay('18276345221')
       return HttpResponse('任务号是:%s' % str(res))

    6 首页轮播图定时更新

    1 把首页轮播图接口改成,先去缓存中取,缓存中没有,再去数据库查
    2 首页轮播图加入了缓存
    3 以后,如果你的接口,请求慢,第一反应就是先使用缓存


    4 使用celery定时更新缓存
    class BannerView(GenericViewSet, ListModelMixin):
       queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-order')[:settings.BANNER_SIZE]
       serializer_class = serializer.BannerModelSerializer

       # 改成,先从缓存中取,缓存中如果有,直接返回,没有,再去数据库查

       def list(self, request, *args, **kwargs):
           # 如果缓存中有值,直接取出来返回,速度很快
           banner_data = cache.get('banner_data')
           if banner_data:

               print('走了缓存')
               return Response(data=banner_data)

           # 如果缓存中没有,再走数据,查出来,放到缓存中
           res = super().list(request, *args, **kwargs)
           # 把首页轮播图数据放到缓存中
           cache.set('banner_data', res.data)
           print('没走缓存')
           return res

     

     

  • 相关阅读:
    sqlhelper使用指南
    大三学长带我学习JAVA。作业1. 第1讲.Java.SE入门、JDK的下载与安装、第一个Java程序、Java程序的编译与执行 大三学长带我学习JAVA。作业1.
    pku1201 Intervals
    hdu 1364 king
    pku 3268 Silver Cow Party
    pku 3169 Layout
    hdu 2680 Choose the best route
    hdu 2983
    pku 1716 Integer Intervals
    pku 2387 Til the Cows Come Home
  • 原文地址:https://www.cnblogs.com/usherwang/p/14248231.html
Copyright © 2011-2022 走看看