zoukankan      html  css  js  c++  java
  • day-90selery

    celery

    什么是Celery:

      处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度

    Celery架构:

      Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

       消息中间件:使用Redis

       任务执行单元:由Celery提供

       任务结果存储:使用Redis

    使用场景:

      异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

      定时任务:定时执行某件事情,比如每天数据统计

    Celery的安装配置:

      pip install enentlet
      pip install celery

    基本使用

    基本使用
    
    1.创建py文件:celery_app_task.py
    
        import celery
        import time
        broker = 'redis://127.0.0.1:6379/0'
        backend = 'redis://127.0.0.1:6379/1'
        app = celery.Celery('test',backend=backend,broker=broker)
    
        @app.task
        def add(x, y):
            time.sleep(1)
            return x + y
    
    
    2.创建py文件:add_task.py(添加任务)
    
        from celery_app_task import add
        result = add.delay(4, 5)
        print(result.id)
    
    3.终端cd到当前文件夹下执行:celery worker -A celery_app_task -l info -P eventlet
    
    
    4.创建py文件:result.py(查看任务执行结果)
    
        from celery.result import AsyncResult
        from celery_app_task import app
    
        async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=app)
    
        if async.successful():
            result = async.get()
            print(result)
        elif async.failed():
            print('执行失败')
        elif async.status == 'PENDING':
            print('任务等待中被执行')
        elif async.status == 'RETRY':
            print('任务异常后正在重试')
        elif async.status == 'STARTED':
            print('任务已经开始被执行')
    
    
    多任务结构
    
    
    pro_cel
        ├── celery_task        # celery相关文件夹
        │   ├── celery.py   # celery连接和配置相关文件,必须叫这个名字
        │   └── tasks.py    #  所有任务函数
        │
        ├── check_result.py # 检查结果
        └── send_task.py    # 触发任务
    
    
    1.celery.py
    
        from celery import Celery
    
        app = Celery('celery_demo',
                     broker='redis://127.0.0.1:6379/1',
                     backend='redis://127.0.0.1:6379/2',
                     include=['celery_task.tasks1',                # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
                              'celery_task.tasks2'
                              ])
    
    
        app.conf.timezone = 'Asia/Shanghai'                        # 时区
    
        app.conf.enable_utc = False                                # 是否使用UTC
    
    
    2.tasks.py
        import time
        from celery_task.celery import app
    
        @app.task
        def test_celery1(res):
            time.sleep(5)
            return "test_celery1任务结果:%s" % res
    
        @app.task
        def test_celery2(res):
            time.sleep(5)
            return "test_celery2任务结果:%s" % res
    
    3.终端cd到当前文件夹下执行:celery worker -A celery_app_task -l info -P eventlet
    
    
    4.send_task.py
        from celery_task.tasks1 import test_celery1
        from celery_task.tasks2 import test_celery2
    
        result = test_celery1.delay()
        print(result.id)
        result = test_celery2.delay()
        print(result.id)
    
    
    5.check_result.py
    
        from celery.result import AsyncResult
        from celery_task.celery import app
    
        async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=app)
    
        if async.successful():
            result = async.get()
            print(result)
        elif async.failed():
            print('执行失败')
        elif async.status == 'PENDING':
            print('任务等待中被执行')
        elif async.status == 'RETRY':
            print('任务异常后正在重试')
        elif async.status == 'STARTED':
            print('任务已经开始被执行')
    
    
    
    Celery执行定时任务
     
    
    add_task.py
    
    from celery_app_task import add
    from datetime import datetime
    
    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=10)
    task_time = utc_ctime + time_delay
    
    # 使用apply_async并设定时间
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)
    
    
    多任务结构中celery.py修改如下
    
    from datetime import timedelta
    from celery import Celery
    from celery.schedules import crontab
    
    cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
        'celery_task.tasks1',
        'celery_task.tasks2',
    ])
    cel.conf.timezone = 'Asia/Shanghai'
    cel.conf.enable_utc = False
    
    cel.conf.beat_schedule = {
    # 名字随意命名 'add-every-10-seconds': {
    # 执行tasks1下的test_celery函数 'task': 'celery_task.tasks1.test_celery',
    # 每隔2秒执行一次 'schedule': timedelta(seconds=2),
    # 传递参数 'args': ('test',) }, # 'add-every-12-seconds': { # 'task': 'celery_task.tasks1.test_celery', # 每年4月11号,8点42分执行 # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # 'args': (16, 16) # }, } 启动一个beat:celery beat -A celery_task -l info 启动work执行:celery worker -A celery_task -l info -P eventlet

    django中使用celery

     

     celery与redis手动、自动提交任务

    自动提交任务

    手动提交任务

    支付宝

    工作流程图:

    支付宝使用:

    ##### 路由:order/urls.py
    
    
    ```python
    from app import views
    urlpatterns = [
        path('alipay/', views.AliPayAPIView.as_view()),
        path('aliback/', views.AliPayBackViewSet.as_view({'get': 'get', 'post': 'post'}))
    ]
    ```
    
    ##### 视图:order/views.py
    
    
    ```python
    
    from rest_framework.views import APIView
    from rest_framework.viewsets import ViewSet
    from rest_framework.response import Response
    
    import time
    from libs.alipay.payinit import ali_init
    class AliPayAPIView(APIView):
        def post(self, request, *args, **kwargs):
            # 初始化alipay
            alipay = payinit.ali_init()
    
            subject = request.data.get('subject')
            money = float(request.data.get('money'))
            # 生成支付的url
            query_params = alipay.direct_pay(
                subject=subject,  # 商品名描述
                out_trade_no="x2" + str(time.time()),  # 商户订单号
                total_amount=money,  # 交易金额(单位: 元 保留俩位小数)
            )
            # 支付宝网关,带上订单参数才有意义
            pay_url = "https://openapi.alipaydev.com/gateway.do?{}".format(query_params)
            # POST请求重定向到支付宝提供的网关,跳转到支付宝支付界面
            return Response(pay_url)
    
    
    class AliPayBackViewSet(ViewSet):
        def post(self, request, *args, **kwargs):
            # 初始化alipay
            alipay = ali_init()
    
            # 检测是否支付成功
            # 去请求体中获取所有返回的数据:状态/订单号
            from urllib.parse import parse_qs
            body_str = request.body.decode('utf-8')
            post_data = parse_qs(body_str)
    
            post_dict = {}
            for k, v in post_data.items():
                post_dict[k] = v[0]
            print(post_dict)
    
            sign = post_dict.pop('sign', None)
            # 通过调用alipay的verify方法去二次认证
            status = alipay.verify(post_dict, sign)
    
            if status:
                # 修改订单状态
                pass
            return Response('验证成功')
    
        def get(self, request, *args, **kwargs):
            # 初始化alipay
            alipay = payinit.ali_init()
    
            params = request.GET.dict()
            sign = params.pop('sign', None)
            status = alipay.verify(params, sign)
            print('GET验证', status)
            if status:
                # 获取订单状态,显示给用户
                return Response('支付成功')
    ```

    支付宝案例:

    项目上线:

      参考视频

  • 相关阅读:
    【微服务架构】SpringCloud之Ribbon
    SpringCloud之Eureka(注册中心集群篇)
    Eureka简介
    两行代码 搞定计数
    HBase详细概述
    电商项目介绍---说的很好
    面试:----Nginx的一理解
    redis介绍
    Linux操作系统介绍
    什么是Solr
  • 原文地址:https://www.cnblogs.com/klw1/p/11421630.html
Copyright © 2011-2022 走看看