zoukankan      html  css  js  c++  java
  • Celery

    Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。它是一个专注于实时处理的任务队列,同时也支持任务调度

    安装celery

    pip install celery

    创建一个celery实例

    创建s1文件

    from celery import Celery
    
    #tasks:当前模块的名称,可随意写,但是必须存在
    #broker:指定要使用的消息中间件,
    #中间件可参考:http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html#id4
    app=Celery("tasks",broker="redis://10.0.0.23:6379/1")
    
    @app.task
    def add():
        return "add"
    @app.task
    def add1():
        return "add1"

    运行celery服务

    celery worker -A s1 -l info   #s1为文件名 info:日志级别

    调用任务(异步任务)

    创建一个新文件,使用 delay() 方法来调用任务

    from s1 import add
    
    s=add.delay()
    print(s)

    此时s1文件会报错,因为在celery4.0后不再执行windows系统,可以安装eventlet来解决报错,当让也可以使用4.0以前的版本

    Traceback (most recent call last):
      File "c:program filespython36libsite-packagesilliardpool.py", line 358, in workloop
        result = (True, prepare_result(fun(*args, **kwargs)))
      File "c:program filespython36libsite-packagesceleryapp	race.py", line 544, in _fast_trace_task
        tasks, accept, hostname = _loc
    ValueError: not enough values to unpack (expected 3, got 0)

    安装eventlet

    pip install eventlet

    指定eventlet启动

    celery worker -A s1 -l info -P eventlet #windows下需要使用 -P eventlet

    保存结果

    from celery import Celery
    #使用backend保存结果,这里使用了redis,推荐使用RabbitMQ
    app=Celery("tasks",broker="redis://10.0.0.23:6379/1",backend="redis://10.0.0.23:6379/2")
    
    @app.task
    def add():
        return "add"
    @app.task
    def add1():
        return "add1"

    查看redis数据库

    [root@node1 ~]# redis-cli -h 10.0.0.23
    10.0.0.23:6379> SELECT 2
    10.0.0.23:6379[2]> KEYS *
     1) "celery-task-meta-908f60a1-2c23-4403-ab89-518691d76f48"
     2) "celery-task-meta-8172677a-a5ad-48ac-93e6-8473ba564766"
     3) "celery-task-meta-4e116ff9-f51f-4797-beb0-b106ce609bac"
     4) "celery-task-meta-282dafcf-1bca-4414-bbc1-87759140bcd6"
     5) "celery-task-meta-6fe880a6-e9e2-4609-9fc3-6d284169c8f7"
     6) "celery-task-meta-1ad7acd9-bdf7-4a55-a7ca-4a7157975036"
     7) "celery-task-meta-6382ea5d-e328-427b-be22-4a6cca74078a"
     8) "celery-task-meta-7a260767-6246-457f-aa91-5598007a7dc2"
     9) "celery-task-meta-8d7b6686-ed63-473b-b4c9-db3ebc807421"
    10) "celery-task-meta-29acbc9d-539c-4db3-b07b-7ce6b7365825"

    查看数据

    10.0.0.23:6379[2]> get celery-task-meta-908f60a1-2c23-4403-ab89-518691d76f48
    "{"status": "SUCCESS", "result": "add", "traceback": null, "children": [], "task_id": "908f60a1-2c23-4403-ab89-518691d76f48", "date_done": "2019-04-16T12:26:04.267013"}"

    celery获取返回值

    from celery.result import AsyncResult #导入AsyncResult
    from s1 import add,app #导入s1文件中的app
    for i in range(10):
        s = add.delay()
        r = AsyncResult(id=s.id,app=app)
        print(r.get())

    带参数的返回值

    s1文件

    from celery import Celery
    #使用backend保存结果,这里使用了redis,推荐使用RabbitMQ
    app=Celery("tasks",broker="redis://10.0.0.23:6379/1",backend="redis://10.0.0.23:6379/2")
    
    @app.task
    def add(a,b):
        return ("add",a+b)
    @app.task
    def add1():
        return "add1"

    s2文件

    from celery.result import AsyncResult #导入AsyncResult
    from s1 import add,app #导入s1文件中的app
    for i in range(10):
        s = add.delay(3,5)
        # r = AsyncResult(id=str(s),app=app) #可以使用str
        r = AsyncResult(id=s.id,app=app) #也可以使用s.id
        print(r.get()) #返回值
        print(r.status) #获取执行状态
        print(r.successful()) # 获取执行状态

    获取报错信息

    from celery.result import AsyncResult #导入AsyncResult
    from s1 import add,app #导入s1文件中的app
    
    s = add.delay(3,5)
    print(s.id)
    r = AsyncResult(id=s.id,app=app) #也可以使用s.id
    
    #只获取报错信息
    print(r.get(propagate=False))
    #获取源文件的报错信息内容
    print(r.traceback)

    执行延时任务

    apply_async

    t=add.apply_async((1,2),countdown=5) #表示延迟5秒钟执行任务
    print(t)
    print(t.get())
    问题:是延迟5秒发送还是立即发送,消费者延迟5秒在执行那?

    支持的参数 :

    • countdown : 等待一段时间再执行.

      add.apply_async((2,3), countdown=5)
    • eta : 定义任务的开始时间.这里的时间是UTC时间,这里有坑

      add.apply_async((2,3), eta=now+tiedelta(second=10))
    • expires : 设置超时时间.

      add.apply_async((2,3), expires=60)
    • retry : 定时如果任务失败后, 是否重试.

      add.apply_async((2,3), retry=False)
    • retry_policy : 重试策略.    

    • max_retries : 最大重试次数, 默认为 3 次.
      interval_start : 重试等待的时间间隔秒数, 默认为 0 , 表示直接重试不等待.
      interval_step : 每次重试让重试间隔增加的秒数, 可以是数字或浮点数, 默认为 0.2
      interval_max : 重试间隔最大的秒数, 即 通过 interval_step 增大到多少秒之后, 就不在增加了, 可以是数字或者浮点数, 默认为 0.2 .

    周期任务

    from c import task
    task.conf.beat_schedule={
        timezone='Asia/Shanghai',
        "each10s_task":{
            "task":"c.add",
            "schedule":3, # 每3秒钟执行一次
            "args":(10,10)
        },
    
    }

    其实celery也支持linux里面的crontab格式的书写的

    from celery.schedules import crontab
    task.conf.beat_schedule={
         timezone='Asia/Shanghai',
        "each3m_task":{
            "task":"c.add",
            "schedule":crontab(minute=3), #每小时的第3分钟执行
            "args":(10,10)
        },
         "each3m_task":{
            "task":"c.add",
            "schedule":crontab(minute=*/3), #每小时的第3分钟执行
            "args":(10,10)
        },
    }

    启动

    celery best -A s2 -l info 

    与django结合

    执行异步任务

    在项目目录下,与settings统计的目录中创建一个名为celery的文件 ,在生成的目录文件中添加celery文件,内容如下

    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tests.settings') #与项目关联 tests为项目名称
    
    app = Celery('tests',backend='redis://10.211.55.19/3',broker='redis://10.211.55.19/4')
    # app = Celery('tests',backend='redis://:password@10.211.55.19/3',broker='redis://:password@10.211.55.19/4') #如果redis存在密码,password为密码
    #创建celery对象
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')
    #在django中创建celery的命名空间
    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()
    #自动加载任务

    编辑settings.py同级目录的init.py

    from __future__ import absolute_import, unicode_literals
    from .celery import app as celery_app
    
    __all__ = ['celery_app']

    在项目中添加tasks文件,用来保存tasks的文件   

    from celery import shared_task
    
    @shared_task
    def add(x, y):
        return x + y
    
    @shared_task
    def mul(x, y):
        return x * y
    
    @shared_task
    def xsum(numbers):
        return sum(numbers)

    添加views文件内容

    from .tasks import add
    
    def index(request):
        result = add.delay(2, 3)
        return HttpResponse('返回数据{}'.format(result.get()))

    启动worker

    celery -A tests  worker -l info

    添加url并调用

    执行周期性任务

    需要安装一个django的组件来完成这个事情

    pip install django-celery-beat

    将django-celery-beat添加到INSTALLED_APPS里面

    INSTALLED_APPS = (
        ...,
        'django_celery_beat',
    )

    刷新到数据库

    python3 manage.py makemigrations #不执行这个会有问题
    python3 manage.py migrate

     admin配置

    启动beat

    celery -A tests beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

     启动worker

    celery -A tests worker -l info 
    
    
    
  • 相关阅读:
    js少写if语句
    框架大集合
    new运算符工作原理(new运算符的伪码实现)
    原始数据类型和引用数据类型
    关于input 的选中,自定义input[type="checkbox"]样式
    css伪类与伪元素
    js 的function为什么可以添加属性
    工具,如何去掉百度编辑器 ueditor 元素路径、字数统计等
    多种框架好库的混合使用
    js预编译的四部曲
  • 原文地址:https://www.cnblogs.com/wanglan/p/10720695.html
Copyright © 2011-2022 走看看