zoukankan      html  css  js  c++  java
  • Celery的任务分发与定时任务

    celery应用场景

    • celery,处理任务的Python的模块。

      • 场景1:

        对【耗时的任务】,通过celery,将任务添加到broker(队列),然后立即给用户返回一个任务ID。
        当任务添加到broker之后,由worker去broker获取任务并处理任务。
        任务完完成之后,再将结果放到backend中
        
        用户想要检查结果,提供任务ID,我们就可以去backend中去帮他查找。
        
      • 场景2:

        定时任务(定时发布/定时拍卖)
        

    celery的使用

    Celery是由Python开发的一个简单、灵活、可靠的处理大量任务的分发系统,它不仅支持实时处理,也支持任务调度。

    支持多个broker和worker来实现高可用和分布式。

    将一些耗时的任务 扔到broker队列中,并且会返回一个任务ID,可以通过任务ID去backend队列中获取结果。 worker从broker获取任务去执行,并将结果返回到backend队列中。
    
    函数名、参数 传入broker
    

    1.1 环境的搭建

    pip3 install celery==4.4
    安装broker: redis或rabbitMQ
    pip3 install redis / pika
    

    1.2 快速使用

    • s1.py

      from celery import Celery
      
      app = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379')
      
      @app.task	# 说明这个函数可以被当作celery的任务
      def x1(x, y):
          return x + y
      
      @app.task
      def x2(x, y):
          return x - y
      
      
    • s2.py

      from s1 import x1
      
      # 立即告诉celery去创建并执行x1任务,并传两个参数
      result = x1.delay(4, 4)
      print(result.id)	# 任务ID
      
      
    • s3.py 获取任务结果

      from celery.result import AsyncResult
      from s1 import app
      
      result_object = AsyncResult(id="任务ID", app=app)
      print(result_object.status)	# 任务状态
      
      data = result_object.get()	# 获取任务结果
      print(data)
      
    任务超时限制

    避免某些任务一直处于非正常的进行中状态,阻塞队列中的其他任务。应该为任务执行设置超时时间。如果任务超时未完成,则会将 Worker 杀死,并启动新的 Worker 来替代。

    @app.task(time_limit=1800)	# 可以设置任务超时限制
    

    运行程序:

    1. 启动redis

    2. 启动worker

      # 首先要进入当前目录
      celery worker -A s1 -l info
      
      # -A s1	找到项目
      # -l info	是打印日志log,代码上线时不加info
      
      windows下会报一个错:
      
      Traceback (most recent call last):
        File "d:wupeiqipy_virtual_envsauctionlibsite-packagesilliardpool.py", line 362, in workloop
          result = (True, prepare_result(fun(*args, **kwargs)))
        File "d:wupeiqipy_virtual_envsauctionlibsite-packagesceleryapp	race.py", line 546, in _fast_trace_task
          tasks, accept, hostname = _loc
      ValueError: not enough values to unpack (expected 3, got 0)
      
      
      解决安装:
      pip install eventlet
      
      celery worker -A s1 -l info -P eventlet
      
    3. 创建任务,放入broker

      python s2.py
      python s2.py
      
    4. 查看任务状态

      # 在s3.py填写任务ID
      ptyhon s3.py 
      

      取消任务

      from s1 import app
      from celery.app.control import Control
      
      celery_control = Control(app=app) 
      celery_control.revoke(id, terminate=True) 
      

    1.3 django中应用celery

    在Django中用Django-celery。

    # pip3 install django-celery (没有用到,还是用的celery模块)
    

    之后,需要按照django-celery的要求进行编写代码。

    • 第一步:【项目/项目/settings.py 】添加配置

      CELERY_BROKER_URL = 'redis://192.168.16.85:6379'
      CELERY_ACCEPT_CONTENT = ['json']
      CELERY_RESULT_BACKEND = 'redis://192.168.16.85:6379'
      CELERY_TASK_SERIALIZER = 'json'
      # CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24   # 任务过期时间
      

      Celery 配置参数汇总

      配置项 说明
      CELERY_DEFAULT_QUEUE 默认队列
      CELERY_BROKER_URL Broker 地址
      CELERY_RESULT_BACKEND 结果存储地址
      CELERY_TASK_SERIALIZER 任务序列化方式
      CELERY_RESULT_SERIALIZER 任务执行结果序列化方式
      CELERY_TASK_RESULT_EXPIRES 任务过期时间
      CELERY_ACCEPT_CONTENT 指定任务接受的内容类型(序列化)
    • 第二步:【项目/项目/celery.py】在项目同名目录创建 celery.py

      #!/usr/bin/env python
      # -*- coding:utf-8 -*-
      
      import os
      from celery import Celery
      
      # set the default Django settings module for the 'celery' program.
      
      # celery指定的配置文件 
      os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demos.settings')
      
      app = Celery('demos')	# 名字随便起,省略了broker配置,配置文件已配置
      
      # Using a string here means the worker doesn't have to serialize
      # the configuration object to child processes.
      # - namespace='CELERY' means all celery-related configuration keys
      #   should have a `CELERY_` prefix.
      # 所有的celery配置都以CELERY开头
      app.config_from_object('django.conf:settings', namespace='CELERY')
      
      # Load task modules from all registered Django app configs.
      # 去每个已注册app中读取 tasks.py 文件
      app.autodiscover_tasks()
      
    • 第三步,【项目/app名称/tasks.py】

      from celery import shared_task
      
      @shared_task
      def add(x, y):
          return x + y
      
      @shared_task
      def mul(x, y):
          return x * y
      
    • 第四步,【项目/项目/__init__.py

      from .celery import app as celery_app
      
      __all__ = ('celery_app',)	
      
    • 启动worker

      # 首先进入项目目录
      
      celery worker -A demos -l info -P eventlet
      
    • 编写视图函数,调用celery去创建任务。

      • url

        # from api.views import task
        
        url(r'^create/task/$', task.create_task),
        url(r'^get/result/$', task.get_result),
        
      • 视图函数

        from django.shortcuts import HttpResponse
        from api.tasks import x1
        from celery.result import AsyncResult
        
        # from demos.celery import app
        from demos import celery_app
        
        def create_task(request):
            print('请求来了')
            result = x1.delay(2,2)	# 添加x1任务,并返回任务ID
            print('执行完毕')
            return HttpResponse(result.id) 
        
        
        def get_result(request):
            nid = request.GET.get('nid')  
            result_object = AsyncResult(id=nid, app=celery_app)
            # print(result_object.status)
            data = result_object.get()	# 获取数据
            return HttpResponse(data)
        
    • 启动django程序

      python manage.py ....
      

    1.4 celery定时执行

    from app01 import tasks
    from celery.result import AsyncResult
    
    def time_task(request):
        """
        定时执行
        :param request:
        :return:
        """
        # 获取本地时间
        ctime = datetime.datetime.now()
        # 转换成utc时间
        utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())
    
        s10 = datetime.timedelta(seconds=60)    # 60s后执行
        ctime_x = utc_ctime + s10   # 执行的时间
    
        # 使用apply_async并设定时间
        result = tasks.mul.apply_async(args=[2, 5], eta=ctime_x)
        return HttpResponse(result.id)
    
    
    def time_result(request):
        nid = request.GET.get('nid')
        from celery.result import AsyncResult
        # from demos.celery import app
        from celerytest import celery_app
        result_object = AsyncResult(id=nid, app=celery_app)
        # print(result_object.status)  # 获取状态
        # data = result_object.get()  # 获取数据
        # result_object.forget()  # 把数据在backend中移除
        # result_object.revoke(terminate=True)  # 取消任务terminate=True强制取消
    	
        # 通过状态绝对返回方式
        if result_object.successful():
            # 成功
            data = result_object.get()
            result_object.forget()
        elif result_object.failed():
            # 失败
            data = '执行失败!'
        else:
            data = '执行中!'
        return HttpResponse(data)
    

    支持的参数 :

    • countdown : 等待一段时间再执行.

      tasks.add.apply_async((2,3), countdown=5)
      
    • eta : 定义任务的开始时间.

      tasks.add.apply_async((2,3), eta=now+tiedelta(second=10))
      
    • expires : 设置超时时间.

      tasks.add.apply_async((2,3), expires=60)
      
    • retry : 定时如果任务失败后, 是否重试.

      tasks.add.apply_async((2,3), retry=False)
      
    • retry_policy : 重试策略.

      • max_retries : 最大重试次数, 默认为 3 次.
      • interval_start : 重试等待的时间间隔秒数, 默认为 0 , 表示直接重试不等待.
      • interval_step : 每次重试让重试间隔增加的秒数, 可以是数字或浮点数, 默认为 0.2
      • interval_max : 重试间隔最大的秒数, 即 通过 interval_step 增大到多少秒之后, 就不在增加了, 可以是数字或者浮点数, 默认为 0.2 .

    1.5 周期性定时任务

    • celery
    • django中也可以结合使用

    task与shared_task装饰器的区别:

    task是通过创建的Celery对象进行调用
    例如:
        app1 = Celery('tasks', broker='redis://192.168.16.48:6379',)
        app2 = Celery('tasks', broker='redis://192.168.16.48:6379',)
        
        @app1.task
        def x1(x, y):
            return x - y
            
        @app2.task
        def x2(x, y):
            return x * y
        
        
    多用于单一文件中,不用直接加载到内存。当有多个Celery对象时,任务函数可以明确使用某一个来装饰。
    
    shared_task多用与多个文件使用celery,一般在celery.py中只创建一个Celery对象,例如Django集成celery。在项目启动时,会将celery对象加载到内存,而@shared_task会自动将写在各个应用下task.py的任务交与内存中的Celery对象,复用性较强。
    例如:
        @shared_task
        def x1(x, y):
            return x - y
            
    	from web import tasks
    	tasks.x1.delay(1,5)
    
    但当在celery.py中创建多个Celery对象时,不同任务使用不同的对象,这个时候就需要指明对象名。
    例如:
    	from web import tasks
    	
    	app1.tasks.x1.delay(1,5)
    	app2.tasks.x2.delay(1,5)
    

    1.6 任务绑定,记录日志,重试

    # 修改 tasks.py 文件.
     
    from celery.utils.log import get_task_logger
    logger = get_task_logger(__name__)
     
    @app.task(bind=True)
    def div(self, x, y):
        logger.info(('Executing task id {0.id}, args: {0.args!r}'
                     'kwargs: {0.kwargs!r}').format(self.request))
        try:
            result = x/y
        except ZeroDivisionError as e:
            raise self.retry(exc=e, countdown=5, max_retries=3)     # 发生 ZeroDivisionError 错误时, 每 5s 重试一次, 最多重试 3 次.
     
        return result
    

    当使用 bind=True 参数之后, 函数的参数发生变化, 多出了参数 self, 这这相当于把 div 编程了一个已绑定的方法, 通过 self 可以获得任务的上下文.

    1.7 启用任务监控

    Flower 是 Celery 官方推荐的实时监控工具,用于监控 Tasks 和 Workers 的运行状态。Flower 提供了下列功能:

    • 查看 Task 清单、历史记录、参数、开始时间、执行状态等
    • 撤销、终止任务
    • 查看 Worker 清单、状态
    • 远程开启、关闭、重启 Worker 进程
    • 提供 HTTP API,方便集成到运维系统

    相比查看日志,Flower 的 Web 界面会显得更加友好。

    Flower 的 supervisor 管理配置文件:

    [program:flower]
    command=/opt/PyProjects/venv/bin/flower -A celery_worker:celery --broker="redis://localhost:6379/2" --address=0.0.0.0 --port=5555 
    directory=/opt/PyProjects/app
    autostart=true
    autorestart=true
    startretries=3 
    user=derby
    stdout_logfile=/var/logs/%(program_name)s.log
    stdout_logfile_maxbytes=50MB
    stdout_logfile_backups=30
    stderr_logfile=/var/logs/%(program_name)s-error.log
    stderr_logfile_maxbytes=50MB
    stderr_logfile_backups=3
    

    celery面试总结

    1. Celery是一个由python开发的一个简单、灵活、可靠的,能够处理大量任务的系统,可以做任务的分发,也能够做定时任务。多用于耗时的操作。例如发送短信、邮箱这些功能就能使用Celery做任务分发。
    2. @shared_task/@task装饰的函数说明是这一个celery任务,会添加broker中。
    3. 函数名.delay(参数),会去调用且执行任务,并且返回任务ID。
    4. 可以根据任务ID,去backend获取任务状态、结果。AsyncResult(id=任务ID, app=celery_app).get()获取任务的结果;
    5. apply_async(args=[参数],eta)设置定时执行任务,eta是定时任务的执行时间(utc时间)。
    6. revoke()可以取消任务。
    
  • 相关阅读:
    一阶倒立摆分析
    用Matlab进行部分分式展开
    2013/07/11 中科院软件所就业讲座总结
    解决vs2010“创建或打开C++浏览数据库文件 发生错误”的问题 Microsoft SQL Server Compact 3.5
    centos安装
    Mongodb——GridFS
    MongoDB—— 写操作 Core MongoDB Operations (CRUD)
    MongoDB—— 读操作 Core MongoDB Operations (CRUD)
    数据库
    影像数据库调研
  • 原文地址:https://www.cnblogs.com/yzm1017/p/13570843.html
Copyright © 2011-2022 走看看