zoukankan      html  css  js  c++  java
  • Python异步任务模块之-celery

    celery 简单上手使用

    1. pip install celery
    2. 并且保持你有一个可使用的redis或者rabbitmq

    teak.py

    from celery import Celery
    
    app = Celery("task",  # 与文件名无关
                 broker="redis://127.0.0.1:6379",  # 消息中间件
                 backend="redis://127.0.0.1:6379"  # 结果存放位置
                 )
    
    
    @app.task  ## 被装饰后可以执行的任务
    def add(x, y):
        print("running.....")
        return x + y
    

    启动celery work

    command: celery -A task worker -l info
    

    在task.py 文件位置打开python解释器

    >>> import task
    >>> task.add(1,2)
    running.....
    3
    >>> aa = task.add.delay(1,2)
    >>> aa
    <AsyncResult: 80691275-5575-45e3-b525-1a0994a1e7df>
    >>> aa.get()
    3
    >>>
    

    celery work 完成相关的任务:

    对于会执行很长时间的任务

    @app.task 
    def add(x, y):
        print("running.....")
        import time
        time.sleep(30)
        return x + y
    
    1. 此时在程序中任务未被work执行完成时,消费者去get会被hang住,我们可以使用 ready() 方法来判断是否结束:aa.ready() 会返回一个bool值
    2. 还可以在使用get方法中添加timeout参数,超时后抛出 celery.exception.TimeoutEroor 异常
    3. 具体的任务异常了(出错),也会导致get方法抛出异常
    4. 为了将work的异常获取到作为返回值,可以使用aa.get(propagate=False) 来获取异常内容,而详细的异常会保存在aa.traceback 中,这样不会导致取结果消费者的程序出问题

    在项目中使用 celery

    项目目录结构:

    CeleryPro
    ├── CeleryPro
    │   ├── __init__.py
    │   ├── __pycache__
    │   ├── celery.py
    │   ├── settings.py
    │   ├── tasks.py
    │   ├── tasks_2.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── manage.py
    └── templates
    

    celery.py

    # python默认从当前路径导入,受用 absolute_import 来导入安装目录中的 celery
    from __future__ import absolute_import, unicode_literals
    from celery import Celery
    
    app = Celery("Celery",  ## 你的celery项目名称
                 broker='redis://127.0.0.1:6379/',
                 backend='redis://127.0.0.1:6379/',
                 include=["CeleryPro.tasks", "CeleryPro.tasks_2", ""]
                 )
    
    app.conf.update(
        result_expires=3600  # 任务结果存储在消息中间件中的expires时间
    )
    
    if __name__ == '__main__':
        app.start()
    
    

    task.py

    from __future__ import absolute_import, unicode_literals
    from .celery import app
    
    
    @app.task
    def add(x, y):
        return x + y
    
    
    @app.task
    def mul(x, y):
        return x * y
    
    
    @app.task
    def xsum(numbers):
        return sum(numbers)
    
    

    启动方法

    # 前台启动:
    command: celery -A CeleryPro worker -l info
    # 后台启动多个 worker:
    command: celery multi start  worker1 -A  CeleryPro worker -l info 
    # 后台启动的 worker 会创建相关的 【wokrer名称.log】【wokrer名称.pid】
    # 多个 worker 可以启动在多个机器上,只要保持消息中间件唯一
    

    其他方法

    command: celery multi restart worker1 worker2 -A CeleryPro
    command: celery multi stop worker1 worker2 
    
    

    定时任务

    目录结构

    ├── CeleryPro
    │   ├── __init__.py
    │   ├── celery.py
    │   ├── periodic_tasks.py  # 定时任务文件
    │   ├── settings.py
    │   ├── tasks.py
    │   ├── tasks_2.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── manage.py
    └── templates
    

    periodic_tasks.py

    from __future__ import absolute_import, unicode_literals
    from celery.schedules import crontab
    from .celery import app
    
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        sender.add_periodic_task(10.0, test.s("Hello"), name="10s_tasks")
        sender.add_periodic_task(30.0, test.s("World"), name="30s_tasks")
        sender.add_periodic_task(
            crontab(hour=21, minute=58, day_of_week=7),
            test.s("Happy monday")
        )
    
    
    @app.task
    def test(arg):
        print(arg)
    
    

    启动

    # 启动 worker,在启动之前,需要将新的任务调度文件注册到 celery 的include中
    ---------------------------------------
    app = Celery("Celery",  ## 你的celery项目名称
                 broker='redis://127.0.0.1:6379/',
                 backend='redis://127.0.0.1:6379/',
                 include=["CeleryPro.tasks", "CeleryPro.tasks_2", "CeleryPro.periodic_tasks"]
                 )
    ---------------------------------------
    command: celery -A CeleryPro worker -l info
    
    # 启动 beat
    command: celery -A CeleryPro.periodic_tasks beat -l info  # 需要指定项目中的具体任务文件
    
    

    调度测试结果


    动态添加定时调度任务,通过这种方式,我们可以按照参数的方式给传入新的定时任务

    celery.py

    app.conf.beat_schedule = {
        "add_every_5_seconds":{
            'task':'CeleryPro.tasks.add',
            'schedule': 5.5,
            'args':(1,2)
        }
    }
    app.conf.timezone = 'UTC'
    

    更多文档可以查看:点击

    在 Django 项目中使用 celery 文档:

    Django 目录结构

    CeleryPro # 项目
    ├── CeleryPro  
    │   ├── __init__.py
    │   ├── __pycache__
    │   ├── celery.py  
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── app01   ## app 
    │   ├── __init__.py
    │   ├── admin.py
    │   ├── apps.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── models.py
    │   ├── tasks.py   # 必须要为 tasks
    │   ├── tests.py
    │   └── views.py
    ├── manage.py
    └── templates
    

    celery.py

    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery
    
    # set the default Django settings module for the 'celery' program.
    # 即如果需要在自己的脚本中访问 django 项目的相关,models需要如下配置
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CeleryPro.settings')
    
    
    app = Celery('proj')
    
    # Using a string here means the worker don't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    # 当 celery 与 django 结合后,你的 selery 使用的消息中间件需要配置在 django 的项目配置中
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # Load task modules from all registered Django app configs.
    # 到各个 app 中自动发现 tasks 文件
    app.autodiscover_tasks()
    
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))
    

    __init__.py

    from __future__ import absolute_import, unicode_literals
    
    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app
    
    __all__ = ['celery_app']
    
    

    app01/tasks.py

    #! /usr/bin/env python
    # -*- coding: utf-8 -*-
    # __author__ = "Leon"
    # Date: 2019/5/26
    # Create your tasks here
    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    
    
    @shared_task  ## 所有的 app 下都可以调用
    def add(x, y):
        return x + y
    
    
    @shared_task
    def mul(x, y):
        return x * y
    
    
    @shared_task
    def xsum(numbers):
        return sum(numbers)
    

    settings.py

    CELERY_BROKER_URL = 'redis://127.0.0.1:6379'
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379'
    

    urls.py

    from django.conf.urls import url
    from django.contrib import admin
    from app01 import views
    
    urlpatterns = [
        url(r'^admin/', admin.site.urls),
        url(r'^AsyncCeleryCall', views.AsyncCeleryCall),
        url(r'^AsyncCeleryGet', views.AsyncCeleryGet),
    ]
    

    app01/views.py

    from django.shortcuts import render, HttpResponse
    
    # Create your views here.
    from app01 import tasks
    import random
    
    
    def AsyncCeleryCall(request):
        """
        
        :param request:
        :return:
        """
        if request.method == "GET":
            a = random.randint(1, 1000)
            t = tasks.add.delay(a, 6)
            print(a)
            return HttpResponse(t.id)
    
        elif request.method == "POST":
            pass
    
        else:
            pass
    
    
    def AsyncCeleryGet(request):
        """
    
        :param request:
        :return:
        """
        from celery.result import AsyncResult
        if request.method == "GET":
            task_id = request.GET.get('id')
            res = AsyncResult(id=task_id)
            if res.ready():
                return HttpResponse(res.get())
            else:
                return HttpResponse(res.ready())
    
        elif request.method == "POST":
            pass
    
        else:
            pass
    
    

    测试

    触发任务

    获取结果

    Django定时任务

    安装新的模块pip install django-celery-beat

    在 settings 中加入 app

    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        "django_celery_beat"  ## here
    ]
    

    创建模块相关的数据库表

    #: python manage.py migrate
    
    Operations to perform:
      Apply all migrations: admin, auth, contenttypes, django_celery_beat, sessions
    Running migrations:
      Applying django_celery_beat.0001_initial... OK
      Applying django_celery_beat.0002_auto_20161118_0346... OK
      Applying django_celery_beat.0003_auto_20161209_0049... OK
      Applying django_celery_beat.0004_auto_20170221_0000... OK
      Applying django_celery_beat.0005_add_solarschedule_events_choices... OK
      Applying django_celery_beat.0006_auto_20180322_0932... OK
      Applying django_celery_beat.0007_auto_20180521_0826... OK
      Applying django_celery_beat.0008_auto_20180914_1922... OK
      Applying django_celery_beat.0006_auto_20180210_1226... OK
      Applying django_celery_beat.0006_periodictask_priority... OK
      Applying django_celery_beat.0009_periodictask_headers... OK
      Applying django_celery_beat.0010_auto_20190429_0326... OK
      Applying django_celery_beat.0011_auto_20190508_0153... OK
    

    创建一个 admin 的新的超级用户

    python manage.py createsuperuser
    

    打开 django-admin后台

    创建新的定时规则


    创建任务与规则的关联,并设定相关的传入参数以及配置


    启动 worker

    command: celery -A CeleryPro worker -l info
    

    启动 beat,每次新添加任务,需要重启 beat

    celery -A CeleryPro beat -l info -S django
    

    结果:


    本次代码地址

    下载

  • 相关阅读:
    Docker容器启动时初始化Mysql数据库
    使用Buildpacks高效构建Docker镜像
    Mybatis 强大的结果集映射器resultMap
    Java 集合排序策略接口 Comparator
    Spring MVC 函数式编程进阶
    换一种方式编写 Spring MVC 接口
    【asp.net core 系列】6 实战之 一个项目的完整结构
    【asp.net core 系列】5 布局页和静态资源
    【asp.net core 系列】4. 更高更强的路由
    【Java Spring Cloud 实战之路】- 使用Nacos和网关中心的创建
  • 原文地址:https://www.cnblogs.com/forsaken627/p/10925714.html
Copyright © 2011-2022 走看看