zoukankan      html  css  js  c++  java
  • Celery使用

    Celery实现了分布式任务队列的功能,提供异步执行,定时任务两个特性。应用首先将任务封装后发送到Broker,Celery启动多个Worker从Broker中获取任务并执行,通过Broker这一层实现异步特性;Celery提供Beat调度器进行定时任务的调度执行,从而实现定时任务功能。

    基本架构图如下:

    基本使用

    • 安装

      pipenv install celery
      
    • 单文件使用

      # vim tasks.py
      
      import time
      from celery import Celery
      
      broker = 'redis://192.168.2.128:6379/1'
      backend = 'redis://192.168.2.128:6379/2'
      app = Celery('my_task', broker=broker, backend=backend)
      
      
      @app.task
      def add(x, y):
          print('enter call func ...')
          time.sleep(4)
          return x + y
      

      add任务在注册时的名字为tasks.add,调用时需要from tasks import add然后才能执行成功

      # vim app.py
      
      from tasks import add
      print('start task ...')
      result = add.delay(2, 8)
      print('end task ...')
      

      如果在tasks.py下执行print(add)得到结果<@task: my_task.add of my_task at 0x101fe61d0>

      如果在app.py下导入add后执行print(add)得到结果<@task: tasks.add of my_task at 0x103135c18>

      所以如果直接在tasks.py下执行add.delay(2, 8),虽然任务发送成功,但是

      worker会报错 Received unregistered task of type 'my_task.add'

      这里留一个疑问为什么两个任务名字会有不同?

    • 模块化使用

      # ls celery_app
      
      __init__.py  			# 创建app对象并加载配置文件
      celeryconfig.py 		# 定义app配置并进行任务注册
      task1.py  task2.py		# 任务模块
      
      # vim __init__.py
      
      from celery import Celery
      
      app = Celery('demo')
      # 通过celery 实例加载配置
      app.config_from_object('celery_app.celeryconfig')
      
      # vim celeryconfig.py
      
      from datetime import timedelta
      from celery.schedules import crontab
      
      # APP配置
      BROKER_URL = 'redis://192.168.2.128:6379/1'
      CELERY_RESULT_BACKEND = 'redis://192.168.2.128:6379/2'
      CELERY_TIMEZONE = 'Asia/Shanghai'
      
      # 任务注册
      CELERY_IMPORTS = (
          'celery_app.task1',
          'celery_app.task2',
      )
      
      # 定时任务
      CELERYBEAT_SCHEDULE = {
          'task1': {
              'task': 'celery_app.task1.add',
              'schedule': timedelta(seconds=10),
              'args': (2, 8)
          },
          'task2': {
              'task': 'celery_app.task2.multiply',
              'schedule': crontab(hour=20, minute=46),
              'args': (4, 5)
          }
      }
      
      # vim task1.py
      
      import time
      from celery_app import app
      
      @app.task
      def add(x, y):
          time.sleep(3)
          return x + y
      
      # vim task2.py
      
      import time
      from celery_app import app
      
      @app.task
      def multiply(x, y):
          time.sleep(4)
          return x * y
      
    • 启动命令

      celery worker -A celery_app.tasks -l INFO		# -A 指定app所在模块 -l 指定日志级别
      celery beat -A celery_app.tasks -l INFO
      
  • 相关阅读:
    Leetcode 811. Subdomain Visit Count
    Leetcode 70. Climbing Stairs
    Leetcode 509. Fibonacci Number
    Leetcode 771. Jewels and Stones
    Leetcode 217. Contains Duplicate
    MYSQL安装第三步报错
    .net 开发WEB程序
    JDK版本问题
    打开ECLIPSE 报failed to load the jni shared library
    ANSI_NULLS SQL语句
  • 原文地址:https://www.cnblogs.com/Peter2014/p/11628588.html
Copyright © 2011-2022 走看看