celery 是分布式任务队列,与调度工具 airflow 强强联合,可实现复杂的分布式任务调度,这就是 CeleryExecutor,有了 CeleryExecutor,你可以调度本地或远程机器上的作业,实现分布式任务调度。本文介绍如何配置 airflow 的 CeleryExecutor
操作步骤
CeleryExecutor 需要 Python 环境安装有 celery。
第一步: 安装celery
pip install celery
Celery 需要一个发送和接受消息的传输者 broker。RabbitMQ 和 Redis 官方推荐的生产环境级别的 broker,这里我们选用 Redis,只是因为安装起来非常方便,而 RabbitMQ 的安装需要 再安装 erlang 。
第二步:配置 airflow.cfg
修改 airflow.cfg
#修改 3 处: executor = CeleryExecutor broker_url = redis://127.0.0.1:6379/0 celery_result_backend = redis://127.0.0.1:6379/0
第三步:安装 python 的 redis 包,为启动 worker 作准备
pip install redis
可选:(配置rabbitMQ)
broker_url = amqp://ct:152108@localhost:5672/ct_airflow celery_result_backend = amqp://ct:152108@localhost:5672/ct_airflow executor = CeleryExecutor
第四步:运行 airflow
待确认步骤:初始化DB: ?
airflow initdb 若之前使用sqllite初始化过可使用 airflow resetdb
#启动webserver #后台运行 airflow webserver -p 9888-D airflow webserver -p 9888 #启动scheduler #后台运行 airflow scheduler -D airflow scheduler #启动worker #后台运行 airflow worker -D #如提示addres already use ,则查看 worker_log_server_port = 8793 是否被占用,如是则修改为 8974 等 #未被占用的端口 airflow worker #启动flower -- 可以不启动 #后台运行 airflow flower -D airflow flower
如果 airflow flower 运行报错,请执行 pip install flower 来安装 flower 。