zoukankan      html  css  js  c++  java
  • Celery

    Celery

    官方

    Celery 官网:http://www.celeryproject.org/

    Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

    Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

    Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统

    专注于实时处理的异步任务队列

    同时也支持任务调度

    注意:

    Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

    Celery异步任务框架

    """
    1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
    2)celery服务为为其他项目服务提供异步解决任务需求的
    注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

    人是一个独立运行的服务 | 医院也是一个独立运行的服务
    正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
    人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
    """

    Celery架构

    Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

    消息中间件

    Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

    任务执行单元

    Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

    任务结果存储

    Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

    使用场景

    异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

    延迟执行:解决延迟任务

    定时执行:解决周期(周期)任务,比如每天数据统计

    Celery的安装配置

    pip install celery

    消息中间件:RabbitMQ/Redis

    app=Celery(‘任务名’, broker=’xxx’, backend=’xxx’)

    两种celery任务结构:提倡用包管理,结构更清晰

    # 如果 Celery对象:Celery(...) 是放在一个模块下的
    # 1)终端切换到该模块所在文件夹位置:scripts
    # 2)执行启动worker的命令:celery worker -A 模块名 -l info -P eventlet
    # 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
    # 注:模块名随意
    
    
    # 如果 Celery对象:Celery(...) 是放在一个包下的
    # 1)必须在这个包下建一个celery.py的文件,将Celery(...)产生对象的语句放在该文件中
    # 2)执行启动worker的命令:celery worker -A 包名 -l info -P eventlet
    # 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info
    # 注:包名随意

    Celery执行异步任务

    基本结构

    # 创建py文件:celery_app_task.py
    import celery
    import time
    # broker='redis://127.0.0.1:6379/2' 不加密码
    backend='redis://:123456@127.0.0.1:6379/1'
    broker='redis://:123456@127.0.0.1:6379/2'
    cel=celery.Celery('test',backend=backend,broker=broker)
    @cel.task
    def add(x,y):
        return x+y

    包架构封装(多任务结构)

    project
        ├── celery_task      # celery包
        │   ├── __init__.py # 包文件
        │   ├── celery.py   # celery连接和配置相关文件,且名字必须叫celery.py
        │   └── tasks.py    # 所有任务函数
        ├── add_task.py      # 添加任务
        └── get_result.py   # 获取结果

    基本使用

    # 1)创建app + 任务
    
    # 2)启动celery(app)服务:
    # 非windows
    # 命令:celery worker -A celery_task -l info
    # windows:
    # pip3 install eventlet
    # celery worker -A celery_task -l info -P eventlet
    
    # 3)添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本
    
    # 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本
    
    
    from celery import Celery
    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
    celery.py
    from .celery import app
    import time
    @app.task
    def add(n, m):
        print(n)
        print(m)
        time.sleep(10)
        print('n+m的结果:%s' % (n + m))
        return n + m
    
    @app.task
    def low(n, m):
        print(n)
        print(m)
        print('n-m的结果:%s' % (n - m))
        return n - m
    tasks.py
    from celery_task import tasks
    
    # 添加立即执行任务
    t1 = tasks.add.delay(10, 20)
    t2 = tasks.low.delay(100, 50)
    print(t1.id)
    
    
    # 添加延迟任务
    from datetime import datetime, timedelta
    eta=datetime.utcnow() + timedelta(seconds=10)
    tasks.low.apply_async(args=(200, 50), eta=eta)
    add_task.py
    from celery_task.celery import app
    
    from celery.result import AsyncResult
    
    id = '01e57ace-03b2-483b-9056-8e7289537e07'
    if __name__ == '__main__':
        async_result=AsyncResult(id=id, app=app)
        if async_result.successful():
            result = async_result.get()
            print(result)
        elif async_result.failed():
            print('任务失败')
        elif async_result.status == 'PENDING':
            print('任务等待中被执行')
        elif async_result.status == 'RETRY':
            print('任务异常后正在重试')
        elif async_result.status == 'STARTED':
            print('任务已经开始被执行')
    get_result.py

    高级使用

    # 1)创建app + 任务
    
    # 2)启动celery(app)服务:
    # 非windows
    # 命令:celery worker -A celery_task -l info
    # windows:
    # pip3 install eventlet
    # celery worker -A celery_task -l info -P eventlet
    
    # 3)添加任务:自动添加任务,所以要启动一个添加任务的服务
    # 命令:celery beat -A celery_task -l info
    
    # 4)获取结果
    
    
    from celery import Celery
    
    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
    
    
    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.enable_utc = False
    
    # 任务的定时配置
    from datetime import timedelta
    from celery.schedules import crontab
    app.conf.beat_schedule = {
        'low-task': {
            'task': 'celery_task.tasks.low',
            'schedule': timedelta(seconds=3),
            # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
            'args': (300, 150),
        }
    }
    celery.py
    from .celery import app
    
    import time
    @app.task
    def add(n, m):
        print(n)
        print(m)
        time.sleep(10)
        print('n+m的结果:%s' % (n + m))
        return n + m
    
    
    @app.task
    def low(n, m):
        print(n)
        print(m)
        print('n-m的结果:%s' % (n - m))
        return n - m
    tasks.py
    from celery_task.celery import app
    
    from celery.result import AsyncResult
    
    id = '01e57ace-03b2-483b-9056-8e7289537e07'
    if __name__ == '__main__':
        async_result=AsyncResult(id=id, app=app)
        if async_result.successful():
            result = async_result.get()
            print(result)
        elif async_result.failed():
            print('任务失败')
        elif async_result.status == 'PENDING':
            print('任务等待中被执行')
        elif async_result.status == 'RETRY':
            print('任务异常后正在重试')
        elif async_result.status == 'STARTED':
            print('任务已经开始被执行')
    get_result.py

    django中使用

    """
    celery框架django项目工作流程
    1)加载django配置环境
    2)创建Celery框架对象app,配置broker和backend,得到的app就是worker
    3)给worker对应的app添加可处理的任务函数,用include配置给worker的app
    4)完成提供的任务的定时配置app.conf.beat_schedule
    5)启动celery服务,运行worker,执行任务
    6)启动beat服务,运行beat,添加任务
    
    重点:由于采用了django的反射机制,使用celery.py所在的celery_task包必须放置项目的根目录下
    """
    
    # 一、加载django配置环境
    import os
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")
    
    # 二、加载celery配置环境
    from celery import Celery
    # broker
    broker = 'redis://127.0.0.1:6379/0'
    # backend
    backend = 'redis://127.0.0.1:6379/1'
    # worker
    app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
    
    
    # 时区
    app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.enable_utc = False
    
    # 任务的定时配置
    from datetime import timedelta
    from celery.schedules import crontab
    app.conf.beat_schedule = {
        'update-banner-list': {
            'task': 'celery_task.tasks.update_banner_list',
            'schedule': timedelta(seconds=10),
            'args': (),
        }
    }
    celery.py
    from .celery import app
    
    from django.core.cache import cache
    from home import models, serializers
    from django.conf import settings
    @app.task
    def update_banner_list():
        queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:settings.BANNER_COUNT]
        banner_list = serializers.BannerSerializer(queryset, many=True).data
        # 拿不到request对象,所以头像的连接base_url要自己组装
        for banner in banner_list:
            banner['image'] = 'http://127.0.0.1:8000%s' % banner['image']
    
        cache.set('banner_list', banner_list, 86400)
        return True
    tasks.py

    一、什么是Celery

    1、celery是什么

    Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。

    Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

    消息中间件

    Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

    任务执行单元

    Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

    任务结果存储

    Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

    另外, Celery还支持不同的并发和序列化的手段

    • 并发:Prefork, Eventlet, gevent, threads/single threaded
    • 序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等

    2、使用场景

    celery是一个强大的 分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。

    异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

    定时任务:定时执行某件事情,比如每天数据统计

    3、Celery具有以下优点

    Simple(简单)
    Celery 使用和维护都非常简单,并且不需要配置文件。
    
    Highly Available(高可用)
    woker和client会在网络连接丢失或者失败时,自动进行重试。并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用。
    
    Fast(快速)
    单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, 和优化设置时)
    
    Flexible(灵活)
    Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等。

    4、Celery安装

    你可以安装Celery通过Python包管理平台(PyPI)或者源码安装
    使用pip安装:

    1
    $ pip install -U Celery

    或着:

    1
    $ sudo easy_install Celery

    二、Celery执行异步任务

    1、基本使用

    创建项目celerypro

    创建异步任务执行文件celery_task:

    import celery
    import time
    backend='redis://127.0.0.1:6379/1'
    broker='redis://127.0.0.1:6379/2'
    cel=celery.Celery('test',backend=backend,broker=broker)
    @cel.task
    def send_email(name):
        print("向%s发送邮件..."%name)
        time.sleep(5)
        print("向%s发送邮件完成"%name)
        return "ok"  

    创建执行任务文件,produce_task.py:

    from celery_task import send_email
    result = send_email.delay("yuan")
    print(result.id)
    result2 = send_email.delay("alex")
    print(result2.id)  

    注意,异步任务文件命令执行:

    1
    celery worker -A celery_task -l info

    创建py文件:result.py,查看任务执行结果,

    from celery.result import AsyncResult
    from celery_task import cel
    
    async_result=AsyncResult(id="c6ddd5b7-a662-4f0e-93d4-ab69ec2aea5d", app=cel)
    
    if async_result.successful():
        result = async_result.get()
        print(result)
        # result.forget() # 将结果删除
    elif async_result.failed():
        print('执行失败')
    elif async_result.status == 'PENDING':
        print('任务等待中被执行')
    elif async_result.status == 'RETRY':
        print('任务异常后正在重试')
    elif async_result.status == 'STARTED':
        print('任务已经开始被执行')

    2、多任务结构

     

    celery.py:

    from celery import Celery
    
    cel = Celery('celery_demo',
                 broker='redis://127.0.0.1:6379/1',
                 backend='redis://127.0.0.1:6379/2',
                 # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
                 include=['celery_tasks.task01',
                          'celery_tasks.task02'
                          ])
    
    # 时区
    cel.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    cel.conf.enable_utc = False

    task01.py,task02.py:

    #task01
    import time
    from celery_tasks.celery import cel
    
    @cel.task
    def send_email(res):
        time.sleep(5)
        return "完成向%s发送邮件任务"%res
    
    
    
    #task02
    import time
    from celery_tasks.celery import cel
    @cel.task
    def send_msg(name):
        time.sleep(5)
        return "完成向%s发送短信任务"%name

    produce_task.py:

    from celery_tasks.task01 import send_email
    from celery_tasks.task02 import send_msg
    
    # 立即告知celery去执行test_celery任务,并传入一个参数
    result = send_email.delay('yuan')
    print(result.id)
    result = send_msg.delay('yuan')
    print(result.id)

    check_result.py:

    from celery.result import AsyncResult
    from celery_tasks.celery import cel
    
    async_result = AsyncResult(id="562834c6-e4be-46d2-908a-b102adbbf390", app=cel)
    
    if async_result.successful():
        result = async_result.get()
        print(result)
        # result.forget() # 将结果删除,执行完成,结果不会自动删除
        # async.revoke(terminate=True)  # 无论现在是什么时候,都要终止
        # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
    elif async_result.failed():
        print('执行失败')
    elif async_result.status == 'PENDING':
        print('任务等待中被执行')
    elif async_result.status == 'RETRY':
        print('任务异常后正在重试')
    elif async_result.status == 'STARTED':
        print('任务已经开始被执行')

    开启work:celery worker -A celery_tasks -l info -P eventlet,添加任务(执行produce_task.py),检查任务执行结果(执行check_result.py)

    三、Celery执行定时任务

    简单结构中:设定时间让celery执行一个定时任务,produce_task.py:  其他文件不变

    from celery_task import send_email
    from datetime import datetime
    
    # 方式一
    # v1 = datetime(2020, 3, 11, 16, 19, 00)
    # print(v1)
    # v2 = datetime.utcfromtimestamp(v1.timestamp())
    # print(v2)
    # result = send_email.apply_async(args=["egon",], eta=v2)
    # print(result.id)
    
    # 方式二
    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=10)
    task_time = utc_ctime + time_delay
    
    # 使用apply_async并设定时间
    result = send_email.apply_async(args=["egon"], eta=task_time)
    print(result.id)

    多任务结构中:celery.py修改如下: 不需要生产者produce_task.py直接用命令:celery beat -A celery_tasks

    from datetime import timedelta
    from celery import Celery
    from celery.schedules import crontab
    
    cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
        'celery_tasks.task01',
        'celery_tasks.task02',
    ])
    cel.conf.timezone = 'Asia/Shanghai'
    cel.conf.enable_utc = False
    
    cel.conf.beat_schedule = {
        # 名字随意命名
        'add-every-10-seconds': {
            # 执行tasks1下的test_celery函数
            'task': 'celery_tasks.task01.send_email',
            # 每隔2秒执行一次
            # 'schedule': 1.0,
            # 'schedule': crontab(minute="*/1"),
            'schedule': timedelta(seconds=6),
            # 传递参数
            'args': ('张三',)
        },
        # 'add-every-12-seconds': {
        #     'task': 'celery_tasks.task01.send_email',
        #     每年4月11号,8点42分执行
        #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
        #     'args': ('张三',)
        # },
    1
    2
    3
    # 启动 Beat 程序$ celery beat -A celery_tasks # Celery Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列
     
    # 之后启动 worker 进程.$ celery -A celery_tasks worker -l info 或者$ celery -B -A celery_tasks worker -l info

    celery -A celery_tasks worker -l info -c 5  # -c 5表示并发数5个

    四、Django中使用celery

    项目根目录创建celery包,目录结构如下:

    mycelery/
    ├── config.py
    ├── __init__.py
    ├── main.py
    └── sms/
        ├── __init__.py
        ├── tasks.py

    配置文件config.py:

    broker_url = 'redis://127.0.0.1:6379/15'
    result_backend = 'redis://127.0.0.1:6379/14'

    任务文件tasks.py:

    # celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!
    from mycelery.main import app
    import time
    
    
    import logging
    log = logging.getLogger("django")
    
    @app.task  # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
    def send_sms(mobile):
        """发送短信"""
        print("向手机号%s发送短信成功!"%mobile)
        time.sleep(5)
    
        return "send_sms OK"
    
    @app.task  # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
    def send_sms2(mobile):
        print("向手机号%s发送短信成功!" % mobile)
        time.sleep(5)
    
        return "send_sms2 OK"

    最后在main.py主程序中对django的配置文件进行加载

    # 主程序
    import os
    from celery import Celery
    # 创建celery实例对象
    app = Celery("sms")
    
    # 把celery和django进行组合,识别和加载django的配置文件
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryPros.settings.dev')
    
    # 通过app对象加载配置
    app.config_from_object("mycelery.config")
    
    # 加载任务
    # 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
    # app.autodiscover_tasks(["任务1","任务2"])
    app.autodiscover_tasks(["mycelery.sms",])
    
    # 启动Celery的命令
    # 强烈建议切换目录到mycelery根目录下启动
    # celery -A mycelery.main worker --loglevel=info

    Django视图调用:

    from django.shortcuts import render
    
    # Create your views here.
    
    
    from django.shortcuts import render,HttpResponse
    from mycelery.sms.tasks import send_sms,send_sms2
    from datetime import timedelta
    
    from datetime import datetime
    def test(request):
    
        ################################# 异步任务
    
        # 1. 声明一个和celery一模一样的任务函数,但是我们可以导包来解决
    
        # send_sms.delay("110")
        # send_sms2.delay("119")
        # send_sms.delay() 如果调用的任务函数没有参数,则不需要填写任何内容
    
    
        ################################# 定时任务
    
        # ctime = datetime.now()
        # # 默认用utc时间
        # utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
        # time_delay = timedelta(seconds=10)
        # task_time = utc_ctime + time_delay
        # result = send_sms.apply_async(["911", ], eta=task_time)
        # print(result.id)
    
        return HttpResponse('ok')

    基本结构

    Celery是由Python开发的一个简单、灵活、可靠的处理大量任务的分发系统,它不仅支持实时处理也支持任务调度。

    • user:用户程序,用于告知celery去执行一个任务。
    • broker: 存放任务(依赖RabbitMQ或Redis,进行存储)
    • worker:执行任务

    celery需要rabbitMQ、Redis、Amazon SQS、Zookeeper(测试中) 充当broker来进行消息的接收,并且也支持多个broker和worker来实现高可用和分布式。http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html

    版本和要求

    环境准备:

    • 安装rabbitMQ或Redis
          见:http://www.cnblogs.com/wupeiqi/articles/5132791.html
    • 安装celery
           pip3 install celery

    快速上手

    import time
    from celery import Celery
    
    app = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379')
    
    
    @app.task
    def xxxxxx(x, y):
        time.sleep(10)
        return x + y
    s1.py
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from s1 import xxxxxx
    
    # 立即告知celery去执行xxxxxx任务,并传入两个参数
    result = xxxxxx.delay(4, 4)
    print(result.id)
    s2.py
    from celery.result import AsyncResult
    from s1 import app
    
    async = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app)
    
    if async.successful():
        result = async.get()
        print(result)
        # result.forget() # 将结果删除
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
    s3.py

    执行 s1.py 创建worker(终端执行命令):

    1
    celery worker -A s1 -l info

    执行 s2.py ,创建一个任务并获取任务ID:

    1
    python3 s2.py

    执行 s3.py ,检查任务状态并获取结果:

    1
    python3 s3.py

    多任务结构

    1
    2
    3
    4
    5
    6
    pro_cel
        ├── celery_tasks# celery相关文件夹
        │   ├── celery.py   # celery连接和配置相关文件
        │   └── tasks.py    #  所有任务函数
        ├── check_result.py # 检查结果
        └── send_task.py    # 触发任务
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from celery import Celery
    
    celery = Celery('xxxxxx',
                    broker='redis://192.168.0.111:6379',
                    backend='redis://192.168.0.111:6379',
                    include=['celery_tasks.tasks'])
    
    # 时区
    celery.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    celery.conf.enable_utc = False
    pro_cel/celery_tasks/celery
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import time
    from .celery import celery
    
    
    @celery.task
    def xxxxx(*args, **kwargs):
        time.sleep(5)
        return "任务结果"
    
    
    @celery.task
    def hhhhhh(*args, **kwargs):
        time.sleep(5)
        return "任务结果"
    pro_cel/celery_tasks/tasks.py
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from celery.result import AsyncResult
    from celery_tasks.celery import celery
    
    async = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery)
    
    if async.successful():
        result = async.get()
        print(result)
        # result.forget() # 将结果删除
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
    pro_cel/check_result.py
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import celery_tasks.tasks
    
    # 立即告知celery去执行xxxxxx任务,并传入两个参数
    result = celery_tasks.tasks.xxxxx.delay(4, 4)
    
    print(result.id)
    pro_cel/send_task.py

    更多配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html

    定时任务

    1. 设定时间让celery执行一个任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    import datetime
    from celery_tasks.tasks import xxxxx
    """
    from datetime import datetime
     
    v1 = datetime(2017, 4, 11, 3, 0, 0)
    print(v1)
     
    v2 = datetime.utcfromtimestamp(v1.timestamp())
    print(v2)
     
    """
    ctime = datetime.datetime.now()
    utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())
     
    s10 = datetime.timedelta(seconds=10)
    ctime_x = utc_ctime + s10
     
    # 使用apply_async并设定时间
    result = xxxxx.apply_async(args=[1, 3], eta=ctime_x)
    print(result.id)

    2. 类似于contab的定时任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    """
    celery beat -A proj
    celery worker -A proj -l info
     
    """
    from celery import Celery
    from celery.schedules import crontab
     
    app = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672', include=['proj.s1', ])
    app.conf.timezone = 'Asia/Shanghai'
    app.conf.enable_utc = False
     
    app.conf.beat_schedule = {
        # 'add-every-10-seconds': {
        #     'task': 'proj.s1.add1',
        #     'schedule': 10.0,
        #     'args': (16, 16)
        # },
        'add-every-12-seconds': {
            'task': 'proj.s1.add1',
            'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
            'args': (16, 16)
        },
    }

    注:如果想要定时执行类似于crontab的任务,需要定制 Scheduler来完成。

    Flask中应用Celery

    1
    2
    3
    4
    5
    pro_flask_celery/
    ├── app.py
    ├── celery_tasks
        ├── celery.py
        └── tasks.py
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from flask import Flask
    from celery.result import AsyncResult
    
    from celery_tasks import tasks
    from celery_tasks.celery import celery
    
    app = Flask(__name__)
    
    TASK_ID = None
    
    
    @app.route('/')
    def index():
        global TASK_ID
        result = tasks.xxxxx.delay()
        # result = tasks.task.apply_async(args=[1, 3], eta=datetime(2018, 5, 19, 1, 24, 0))
        TASK_ID = result.id
    
        return "任务已经提交"
    
    
    @app.route('/result')
    def result():
        global TASK_ID
        result = AsyncResult(id=TASK_ID, app=celery)
        if result.ready():
            return result.get()
        return "xxxx"
    
    
    if __name__ == '__main__':
        app.run()
    app.py
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from celery import Celery
    from celery.schedules import crontab
    
    celery = Celery('xxxxxx',
                    broker='redis://192.168.10.48:6379',
                    backend='redis://192.168.10.48:6379',
                    include=['celery_tasks.tasks'])
    
    # 时区
    celery.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    celery.conf.enable_utc = False
    celery_tasks/celery.py
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import time
    from .celery import celery
    
    
    @celery.task
    def hello(*args, **kwargs):
        print('执行hello')
        return "hello"
    
    
    @celery.task
    def xxxxx(*args, **kwargs):
        print('执行xxxxx')
        return "xxxxx"
    
    
    @celery.task
    def hhhhhh(*args, **kwargs):
        time.sleep(5)
        return "任务结果"
    celery_task/tasks.py

    Django中应用Celery

    一、基本使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    django_celery_demo
    ├── app01
    │   ├── __init__.py
    │   ├── admin.py
    │   ├── apps.py
    │   ├── migrations
    │   ├── models.py
    │   ├── tasks.py
    │   ├── tests.py
    │   └── views.py
    ├── db.sqlite3
    ├── django_celery_demo
    │   ├── __init__.py
    │   ├── celery.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── manage.py
    ├── red.py
    └── templates
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import os
    from celery import Celery
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_demo.settings')
    
    app = Celery('django_celery_demo')
    
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()
    django_celery_demo/celery.py
    from .celery import app as celery_app
    
    __all__ = ('celery_app',)
    django_celery_demo/__init__.py
    from celery import shared_task
    
    
    @shared_task
    def add(x, y):
        return x + y
    
    
    @shared_task
    def mul(x, y):
        return x * y
    
    
    @shared_task
    def xsum(numbers):
        return sum(numbers)
    app01/tasks.py
    ...
    ....
    .....
    # ######################## Celery配置 ########################
    CELERY_BROKER_URL = 'redis://10.211.55.20:6379'
    CELERY_ACCEPT_CONTENT = ['json']
    CELERY_RESULT_BACKEND = 'redis://10.211.55.20:6379'
    CELERY_TASK_SERIALIZER = 'json'
    django_celery_demo/settings.py
    from django.shortcuts import render, HttpResponse
    from app01 import tasks
    from django_celery_demo import celery_app
    from celery.result import AsyncResult
    
    
    def index(request):
        result = tasks.add.delay(1, 8)
        print(result)
        return HttpResponse('...')
    
    
    def check(request):
        task_id = request.GET.get('task')
        async = AsyncResult(id=task_id, app=celery_app)
        if async.successful():
            data = async.get()
            print('成功', data)
        else:
            print('任务等待中被执行')
    
        return HttpResponse('...')
    app01/views.py
    """django_celery_demo URL Configuration
    
    The `urlpatterns` list routes URLs to views. For more information please see:
        https://docs.djangoproject.com/en/1.11/topics/http/urls/
    Examples:
    Function views
        1. Add an import:  from my_app import views
        2. Add a URL to urlpatterns:  url(r'^$', views.home, name='home')
    Class-based views
        1. Add an import:  from other_app.views import Home
        2. Add a URL to urlpatterns:  url(r'^$', Home.as_view(), name='home')
    Including another URLconf
        1. Import the include() function: from django.conf.urls import url, include
        2. Add a URL to urlpatterns:  url(r'^blog/', include('blog.urls'))
    """
    from django.conf.urls import url
    from django.contrib import admin
    from app01 import views
    
    urlpatterns = [
        url(r'^admin/', admin.site.urls),
        url(r'^index/', views.index),
        url(r'^check/', views.check),
    ]
    django_celery_demo/urls.py

    二、定时任务

    1. 安装

    1
    install django-celery-beat

    2. 注册app

    1
    2
    3
    4
    INSTALLED_APPS = (
        ...,
        'django_celery_beat',
    )

    3. 数据库去迁移生成定时任务相关表

    1
    python manage.py migrate

    4. 设置定时任务

    • 方式一:代码中配置
      复制代码
      #!/usr/bin/env python
      # -*- coding:utf-8 -*-
      
      import os
      from celery import Celery
      
      # set the default Django settings module for the 'celery' program.
      os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_demo.settings')
      
      app = Celery('django_celery_demo')
      
      # Using a string here means the worker doesn't have to serialize
      # the configuration object to child processes.
      # - namespace='CELERY' means all celery-related configuration keys
      #   should have a `CELERY_` prefix.
      app.config_from_object('django.conf:settings', namespace='CELERY')
      
      
      app.conf.beat_schedule = {
          'add-every-5-seconds': {
              'task': 'app01.tasks.add',
              'schedule': 5.0,
              'args': (16, 16)
          },
      }
      
      
      # Load task modules from all registered Django app configs.
      app.autodiscover_tasks()
      复制代码
    • 方式二:数据表录入

    5. 后台进程创建任务

    1
    celery -A django_celery_demo beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

    6. 启动worker执行任务

    1
    celery -A django_celery_demo worker -l INFO  

    官方参考:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery-with-django

  • 相关阅读:
    一次性解决window系统下,git日志乱码的问题
    多线程之线程状态,状态切换种类及代码实例
    mybatis 第一个demo,并记一次解决问题:Mapped Statements collection does not contain value for
    有100盏灯,分别写上编号1~100,同样地 有100个开关,写上编号1~100。当我按1号开关,写上1的倍数的灯会开/关(如果灯开着就关,相反地,关着就会开),当我按2号开关,写上2的倍数的灯会开/关,如此类推
    阿里云云服务器 centos 7.4 安装mysql 过程记录
    java实现树形输出
    MATLAB入门笔记
    经测试稳定可用的蓝牙链接通信Demo,记录过程中遇到的问题的思考和解决办法,并整理后给出一个Utils类可以简单调用来实现蓝牙功能
    View的相关原理(读书笔记)
    JAVA设计方法思考之如何实现一个方法执行完毕后自动执行下一个方法
  • 原文地址:https://www.cnblogs.com/bubu99/p/12873474.html
Copyright © 2011-2022 走看看