zoukankan      html  css  js  c++  java
  • Celery使用

    Celery实现了分布式任务队列的功能,提供异步执行,定时任务两个特性。应用首先将任务封装后发送到Broker,Celery启动多个Worker从Broker中获取任务并执行,通过Broker这一层实现异步特性;Celery提供Beat调度器进行定时任务的调度执行,从而实现定时任务功能。

    基本架构图如下:

    基本使用

    • 安装

      pipenv install celery
      
    • 单文件使用

      # vim tasks.py
      
      import time
      from celery import Celery
      
      broker = 'redis://192.168.2.128:6379/1'
      backend = 'redis://192.168.2.128:6379/2'
      app = Celery('my_task', broker=broker, backend=backend)
      
      
      @app.task
      def add(x, y):
          print('enter call func ...')
          time.sleep(4)
          return x + y
      

      add任务在注册时的名字为tasks.add,调用时需要from tasks import add然后才能执行成功

      # vim app.py
      
      from tasks import add
      print('start task ...')
      result = add.delay(2, 8)
      print('end task ...')
      

      如果在tasks.py下执行print(add)得到结果<@task: my_task.add of my_task at 0x101fe61d0>

      如果在app.py下导入add后执行print(add)得到结果<@task: tasks.add of my_task at 0x103135c18>

      所以如果直接在tasks.py下执行add.delay(2, 8),虽然任务发送成功,但是

      worker会报错 Received unregistered task of type 'my_task.add'

      这里留一个疑问为什么两个任务名字会有不同?

    • 模块化使用

      # ls celery_app
      
      __init__.py  			# 创建app对象并加载配置文件
      celeryconfig.py 		# 定义app配置并进行任务注册
      task1.py  task2.py		# 任务模块
      
      # vim __init__.py
      
      from celery import Celery
      
      app = Celery('demo')
      # 通过celery 实例加载配置
      app.config_from_object('celery_app.celeryconfig')
      
      # vim celeryconfig.py
      
      from datetime import timedelta
      from celery.schedules import crontab
      
      # APP配置
      BROKER_URL = 'redis://192.168.2.128:6379/1'
      CELERY_RESULT_BACKEND = 'redis://192.168.2.128:6379/2'
      CELERY_TIMEZONE = 'Asia/Shanghai'
      
      # 任务注册
      CELERY_IMPORTS = (
          'celery_app.task1',
          'celery_app.task2',
      )
      
      # 定时任务
      CELERYBEAT_SCHEDULE = {
          'task1': {
              'task': 'celery_app.task1.add',
              'schedule': timedelta(seconds=10),
              'args': (2, 8)
          },
          'task2': {
              'task': 'celery_app.task2.multiply',
              'schedule': crontab(hour=20, minute=46),
              'args': (4, 5)
          }
      }
      
      # vim task1.py
      
      import time
      from celery_app import app
      
      @app.task
      def add(x, y):
          time.sleep(3)
          return x + y
      
      # vim task2.py
      
      import time
      from celery_app import app
      
      @app.task
      def multiply(x, y):
          time.sleep(4)
          return x * y
      
    • 启动命令

      celery worker -A celery_app.tasks -l INFO		# -A 指定app所在模块 -l 指定日志级别
      celery beat -A celery_app.tasks -l INFO
      
  • 相关阅读:
    七牛云上传图片
    找到当前字符串中最后一个/并获取之后的字符串
    jquery正则表达式验证:验证身份证号码
    apply()与call()的区别
    js 判断字符串是否包含某字符串,String对象中查找子字符,indexOf
    改变父元素的透明度,不影响子元素的透明度—css
    c实现生产者消费者问题。 windows下。
    python基础练习 dict切片
    html+css test1
    codewars[7]-python Friend or Foe?
  • 原文地址:https://www.cnblogs.com/Peter2014/p/11628588.html
Copyright © 2011-2022 走看看