Celery是一个用 Python 实现的任务队列框架(Task Queue),是一种在线程或机器间分发任务的机制。

一共有五部分组成,Beat、Broker、Worker、Task、Backend

  • Task:任务
  • Beat:定时触发
  • Broker: 接收消息,将消息放入队列
  • Worker:持续监听队列,接收消息后执行特定的任务
  • Backend:存储结果

我们平时经常也会做类似的工作,比如发邮件、发短信等用异步队列来做,把消息打入队列,然后由一个 Worker 来执行。Celery 是这个工作的集大成者。它在之上提供了以下几个功能:

  • 高可用性,与 Broker 的连接进行自动重连
  • 模块化,可随时替代连接池、 序列化、压缩模式、日志、调度器、消费者、生产者、自动扩展、 Broker 等
  • 可使用 Canvas 机制构建复杂工作流
  • 有完整的流水线监控
  • 可进行时间和速率限制
  • 提供了计划任务
  • 资源泄露保护
  • 自动扩展,保障服务质量
  • ……

下面是一个简单入门:

首先,使用 Docker 启动一个 rabbitmq 作为 Broker:


docker run -v rmq-data:/var/lib/rabbitmq/mnesia/node@rabbitmq -d -p 5672:5672 -p 15672:15672 --hostname rabbitmq --name rmq -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -e RABBITMQ_NODENAME=node@rabbitmq -e RABBITMQ_DEFAULT_VHOST=my_vhost rabbitmq:management-alpine

安装 Celery


pip install -U Celery

创建定时任务

首先创建一个文件:celery_config.py,内容如下:


from datetime import timedelta

CELERYBEAT_SCHEDULE = {
    # 创建 schedule
    'cleanup-something': {
        'task': 'tasks.cleanup', # 指定 Task
        'schedule': timedelta(seconds=3),  # 每 3 秒一次
    }
}


创建文件 tasks.py

文件内容如下:


from celery import Celery
import time

app = Celery('tasks', broker='amqp://user:password@localhost:5672/my_vhost')

app.config_from_object('celery_config')

@app.task
def cleanup():
    # 定时清理一些东西
    print("cleanup something")
    # 等待 1 秒
    time.sleep(1)
    return {
        "status": "success"
    }


启动 worker:


celery worker -A tasks

启动成功后,我们创建一个 beat 来触发这个定时任务。


celery beat -A tasks

启动 beat 后会自动创建两个文件:

  • celerybeat-schedule.db
  • celerybeat.pid

用来存储 Beat 的信息。

此时,再看 Worker 的窗口,Worker 中的 Task 就不断地执行了。

[2020-04-24 11:11:22,065: WARNING/ForkPoolWorker-1] cleanup something
[2020-04-24 11:11:25,066: WARNING/ForkPoolWorker-1] cleanup something
[2020-04-24 11:11:28,066: WARNING/ForkPoolWorker-1] cleanup something
[2020-04-24 11:11:31,066: WARNING/ForkPoolWorker-1] cleanup something
[2020-04-24 11:11:34,066: WARNING/ForkPoolWorker-1] cleanup something
[2020-04-24 11:11:37,065: WARNING/ForkPoolWorker-1] cleanup something
[2020-04-24 11:11:40,065: WARNING/ForkPoolWorker-1] cleanup something
……

创建异步任务

上面是定时任务,由独立进程 Beat 进行触发。下面我们创建一个处理订单的异步任务,需要传入订单 ID,由一个 Task 对订单进行异步处理。

首先在 tasks.py 中添加这个模拟任务,代码如下:


@app.task
def handle_order(order_id):
    print("handle order...", order_id)
    time.sleep(1)
    return {
        "status": "success"
    }


重启 worker,然后创建 test 文件,内容如下:


from tasks import handle_order
import time

# 异步调用

for i in range(10):
    # 每隔 1 秒提交一个订单
    handle_order.delay(i+1)
    print("send order", i+1)
    time.sleep(1)

直接执行:python test.py

可看到 Worker 的执行结果:

[2020-04-24 11:19:51,137: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:19:51,138: WARNING/ForkPoolWorker-1] 1
[2020-04-24 11:19:52,144: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:19:52,144: WARNING/ForkPoolWorker-1] 2
[2020-04-24 11:19:53,150: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:19:53,150: WARNING/ForkPoolWorker-1] 3
[2020-04-24 11:19:54,156: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:19:54,157: WARNING/ForkPoolWorker-1] 4
[2020-04-24 11:19:55,163: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:19:55,164: WARNING/ForkPoolWorker-1] 5
[2020-04-24 11:19:56,168: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:19:56,168: WARNING/ForkPoolWorker-1] 6
[2020-04-24 11:19:57,170: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:19:57,170: WARNING/ForkPoolWorker-1] 7
[2020-04-24 11:19:58,176: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:19:58,177: WARNING/ForkPoolWorker-1] 8
[2020-04-24 11:19:59,182: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:19:59,182: WARNING/ForkPoolWorker-1] 9
[2020-04-24 11:20:00,186: WARNING/ForkPoolWorker-1] handle order...
[2020-04-24 11:20:00,186: WARNING/ForkPoolWorker-1] 10

以上就是一个基础的 Celery 用法了。

获取结果

上面还没有使用到的部件是 Backend,这个用来存储状态和结果。

首先,我们准备一个 redis 来做 backend。可用的 backend 可看官方文档


docker run -d --name redis -p 6379:6379 -v redis-data:/data redis:5.0.5-alpine

在 celery_config.py 中添加下面的 Backend 的配置:


CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'

添加一个异步任务


@app.task
def add(x, y):
    return x + y

重启 worker。

创建文件 test-add.py 添加代码:



from tasks import add

r = add.delay(1, 2)

# 获取结果
print(r.get(timeout=3))

还需要安装一下 redis 的插件


pip install -U celery[redis]

直接执行 python test-add.py 即可。

如果没有配置好 Backend 的话,会报 AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for' 的错

可得结果