zoukankan      html  css  js  c++  java
  • Celery快速入门

    celery简介

    celery是基于python开发的分布式异步消息任务队列,说它是队列,其实并不是我们通常理解的那种存放数据或者任务的一个管道,可以看做是一个软件或者说是一个功能组件,其主要构成部分如下:

    • user:用户程序,用于告知celery去执行一个任务。
    • broker: 存放任务(依赖RabbitMQ或Redis,进行存储)
    • worker:执行任务
      除了上述三个组件,还有一个可选的backend, 用于存放任务执行结果

    安装

    安装celery

    pip3 install celery

    安装redis

    1. 官方站点: http://download.redis.io/releases/, 下载最新版或者最新stable版,建议stable版本
    2. 解压源码并进入目录
    3. 不用configure
    4. 直接make
    5. make PREFIX=/usr/local/redis install
    6. 在解压的redis目录把redis.conf 复制到 /usr/local/redis/bin 目录下
    7. 修改redis.conf的daemonize yes bind 0.0.0.0
    8. 使用./redis-server redis.conf, redis-cli
    9. pip install -U celery[redis]

    使用

    基本使用

    t1.py 定义了Celery对象,并用该对象装饰我们自定义的函数

    import time
    from celery import Celery
    app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379')
    @app.task
    def xxxooo(x, y):
        time.sleep(10)
        return x + y
    

    然后启动worker,实在t1.py当中,worker已经知道了自己的broker 和 backend 在哪里了
    前台启动 celery worker -A t1 -l info
    后台启动 celery multi start w1 -A proj -l info
    把celery可以识别的任务传给celery这个组件

    from t1 import xxxooo
    result = xxxooo.delay(4, 4)
    print(result.id)
    
    from celery.result import AsyncResult
    from s1 import app
    async = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app)
    if async.successful():
        result = async.get()
        print(result)
        # result.forget() # 将结果删除
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
    

    生产上使用

    目录结果如下:
    pro_cel
    ├── celery_tasks # celery相关文件夹
    │ ├── celery.py # celery连接和配置相关文件
    │ └── tasks.py # 所有任务函数
    ├── check_result.py # 检查结果
    └── send_task.py # 触发任务

    celery.py

    文件名必须是celery.py

    from celery import Celery
    celery = Celery('xxxxxx',
                    broker='redis://localhost:6379',
                    backend='redis://localhost:6379',
                    include=['celery_tasks.tasks'])
    celery.conf.timezone = 'Asia/Shanghai'
    celery.conf.enable_utc = False
    

    tasks.py

    import time
    from .celery import celery
    @celery.task
    def xxxxx(*args, **kwargs):
        time.sleep(5)
        return "任务结果"
    @celery.task
    def hhhhhh(*args, **kwargs):
        time.sleep(5)
        return "任务结果"
    

    check_result

    from celery.result import AsyncResult
    from celery_tasks.celery import celery
    async = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery)
    if async.successful():
        result = async.get()
        print(result)
        # result.forget() # 将结果删除
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
    

    send_task

    import celery_tasks.tasks
    # 立即告知celery去执行xxxxxx任务,并传入两个参数
    result = celery_tasks.tasks.xxxxx.delay(4, 4)
    print(result.id)
    

    使用方式:

    1. 启动worker 进入pro_cel目录下执行 celery worker -A celery_tasks -l info
    2. 执行 python3 send_tasks.py

    定时任务

    1. 设定时间让celery到了时间帮我们执行一次任务
    import datetime
    from celery_tasks.tasks import xxxxx
    ctime = datetime.datetime.now()
    utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp()) 
    s10 = datetime.timedelta(seconds=10)
    ctime_x = utc_ctime + s10
    result = xxxxx.apply_async(args=[1, 3], eta=ctime_x)
    print(result.id)
    
    1. 类似crontab的定时任务
    from celery import Celery
    from celery.schedules import crontab 
    app = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672', include=['proj.s1', ])
    app.conf.timezone = 'Asia/Shanghai'
    app.conf.enable_utc = False 
    app.conf.beat_schedule = {
         'add-every-10-seconds': {
             'task': 'celery_tasks.tasks.xxxxx',
             'schedule': 10.0,    # 每10s执行一次
             'args': (16, 16)
        },
        'add-every-12-seconds': {
            'task': 'celery_tasks.tasks.xxxxx',
            'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
            'args': (16, 16)
        },
    }
    

    创建Worker的方式并没有发行变化,但是这里要注意的是,每间隔一定时间后需要生产出来任务给Worker去执行,这里需要一个生产者beat
    ``celery beat -A celery_tasks #创建生产者 beat 你的 schedule 写在哪里,就要从哪里启动
    然后 celery worker -A proj -l celery_tasks

    使用celery遇到的坑

    celery不能用root用户启动问题

    from celery import Celery, platforms
    
    platforms.C_FORCE_ROOT = True  #加上这一行
    

    内存泄漏

    长时间运行Celery有可能发生内存泄露,可以像下面这样设置

    CELERYD_MAX_TASKS_PER_CHILD = 40 # 每个worker执行了多少任务就会死掉
    
  • 相关阅读:
    vulcanjs 包类型
    vulcanjs schemas&& collections
    vulcanjs 核心架构概念
    vulcanjs 开源工具方便快速开发react graphql meteor 应用
    ory Oathkeeper Ecosystem
    ory Oathkeeper docker-compose 安装运行
    benthos stream nats 集成试用
    benthos 几个方便的帮助命令
    benthos 通过配置文件配置 stream 说明
    benthos 通过rest api 配置 stream 说明
  • 原文地址:https://www.cnblogs.com/longyunfeigu/p/9088844.html
Copyright © 2011-2022 走看看