前言
最近在使用flask开发的时候,遇到了一些需要放到后台去运行的任务,在django里面我会立马就想到django-celery这个库,但是去github搜索了一下flask类似的库,质量却是严重残次不齐,有的甚至已经年久失修了,花费了很久的时间去实践最后却是各种报错。
并且官方文档中的celery后套任务只适合那app.py模式的开发的,工厂模式好像只字未提,反正我是没有找到。为了避免更多的人继续踩坑,同时也给自己一个可以记录的文章,这里我就通过这篇博文简单说一说我的flask+celery的脱坑之旅吧。
安装
首先我的电脑是Windows10专业版,python版本是3.8.6
,只要你是python3.6+的应该问题不大
pip install flask
pip install celery==4.4.2 # 目前celery的最新版本是5.0,但是我们还是先使用4.4.2吧
GitHub中大部分的仓库都是基于celery3的,但是4.4.2修复了很多的问题,所以选择4.4.2版本。
参见:https://www.pythonheidong.com/blog/article/414978/9e3f20768743047ded01/
由于我的broker和backend设置的是redis所以还需要安装redis库
pip install redis
由于我们是用的redis做得broker,所以我们先启动redis。
另外值得注意的是在Win10上面使用celery4.4.2是需要安装一个eventlet库的,具体原因还请看看celery官方仓库的issues
地址:https://github.com/celery/celery/issues/4081
如果不安装的话是会报错的,报错详情:ValueError: not enough values to unpack (expected 3, got 0)
pip install eventlet
坑几何?
包括以下几种:
- 循环导入问题
- 没有在app上下文环境中
- 找不到task任务
我本人已经踩过这些坑了,所以下面直接就是正确的示例,是不是很OK!
单文件模式
flask单文件模式,就是常见的app = Flask(__name__)
模式。在一个文件中也可以完成的。
下面直接上代码
from flask import Flask
from celery import Celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://127.0.0.1:6379/1'
app.config['CELERY_RESULT_BACKEND'] = 'redis://127.0.0.1:6379/2'
celery_app = Celery(__name__,
broker=app.config['CELERY_BROKER_URL'],
backend=app.config['CELERY_RESULT_BACKEND'])
@celery_app.task()
def add2(x, y):
return x + y
@app.route('/')
def index():
results = add2.delay(3, 5)
return str(results.wait())
if __name__ == '__main__':
app.run(debug=True)
这些就是单文件模式的代码,这其中我们添加了一个任务add2
,然后启动flask。
然后由于celery和flask是同级别的app,所以我们需要一个新的窗口启动celery,加入-P参数指定eventlet
当我们启动celery之后。看到最后一行的ready的时候,说明我们的celery已经启动成功了。然后再看有一个[tasks]位置下面有一个. app.add2
的标识说明我们的任务已经被添加成功了。如果[tasks]下面没有这种标识,说明我们的celery任务加载失败了。就是上面的坑3
这时我们打开浏览器访问以下我们的网址:http://127.0.0.1:5000/
同时我们查看一下celery的窗口:
打印出了任务执行的日志。单文件模式我们就讲到这里吧。
工厂模式
当然我们如果用flask写一个稍微复杂的东西的话,其实工厂模式我们应该用的更多。下面我们一起来看看工厂模式中的配置。首先我们先规划一个flask+celery的目录结构。
首先是一级目录flasker
,我们先在项目中创建这个目录。然后创建下面的文件:
文件名 | 作用 |
---|---|
__init__.py |
flask工厂模式app创建 |
config.py | 配置文件 |
tasks.py | celery任务模块 |
views.py | flask视图模块 |
workers.py | celery APP创建 |
首先呢我们需要在配置文件中写入东西
import os
SECRET_KEY = os.getenv('SECRET_KEY', "2021")
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
里面就写一个加密和celery的一些配置信息吧。
__init__.py
我们在__init__.py
文件中创建出flask的app
from flask import Flask
def create_app():
app = Flask(__name__)
app.config.from_pyfile('config.py')
register_blueprints(app)
return app
def create_celery_app():
app = Flask(__name__)
app.config.from_pyfile('config.py')
# 如果有extensions,仅添加extensions即可
return app
def register_blueprints(app):
from .views import th
app.register_blueprint(th, url_prefix='/')
可以看到我们创建了两个app第一个create_app()是flask项目的app,第二个是针对celery另外创建的flask-celery-app。这两个app有什么不同呢,主要是为了解决,循环引用的问题。在create_app中我们导入了视图,就是register_blueprints,但是在为celery中创建的我们没有导入。就是为了避免循环导入的问题。为什么会产生循环引用的问题呢?我在这里画一张图让我们可以更清晰的看到。
可以看大我们单独创建一个app的话是不必注册蓝图的,就不存在循环导入的问题。而直接使用flask的app由于需要注册蓝图到app中,所以就会产生循环导入的问题。
workers.py
上面的两个准备工作做好了,我们就可以创建一个celery的app了,我们新建一个workers的py文件,写入以下内容
from celery import Celery
from flasker import create_celery_app
def make_celery(app):
celery = Celery(app.import_name,
broker=app.config['CELERY_BROKER_URL'],
backend=app.config['CELERY_RESULT_BACKEND'])
celery.conf.update(app.config)
class ContextTask(celery.Task):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
flask_app = create_celery_app()
celery = make_celery(flask_app)
这其中主要的就是make_celery函数了,写法我们就复制的flask官方文档中的内容。
然后导入我们刚才创建的create_celery_app专门为celery服务的flask app,然后生成celery对象。
一切准备就绪后,我们开始创建任务
tasks.py
from .workers import celery
@celery.task()
def add2(x, y):
return x * y
我们导入celery app然后创建一个任务。
views.py
任务创建好之后我们创建一个蓝图来使用我们的celery任务。
from flask import Blueprint
from .tasks import add2
th = Blueprint('', __name__)
@th.route('/')
def index():
res = add2.delay(6, 6)
return str(res.wait())
这里我们创建了一个蓝图,然后导入add2
任务,并在index视图中使用该任务。
执行任务
接下来我们开始启动一下,首先我们启动flask > flask run
,同时和单文件模式一样我们还需要启动celery。
在这里我们启动celery我们有两种方式启动celery
启动方式1:
可以看到命令行提示信息,我们已经启动成功了。我们打开浏览器查看
可以看到执行成功了。再看看命令行。
任务已经成功的执行了。
启动方式2:
我们接着按照第二种启动方式执行一下。
和启动方式1不同的地方就是在红框中被框选的内容。可以看到启动成功了。但是这种启动方式有一个坑,什么坑呢 ,看红色的框框中的[tasks],没有发现任务,就是上面的坑之一喽。出现这种情况,我们应该怎么做呢?上面有一个django配置的文章,我们可以参照django的celery配置内容,在make_celery的函数return之前给它加入自动发现任务。具体内容如下:
...
celery.Task = ContextTask
celery.autodiscover_tasks([app.import_name])
return celery
加入autodiscover_tasks方法传入并传入应用名称的列表,加好之后启动celery。
启动好了之后,我们启动flask和celery,访问浏览器。
可以看到执行成功了。这样flask—celery就配置好了。
最后一个坑
当然还有两个坑我们没有说到,一个就是没有在app上下文中。第二就是找不到flask-app,第二个我现在已经找不到怎么复现了,所以我们直接就说不在应用上下文这个吧。这个怎么回事呢?就是当我们给make_celery中传入current_app时就会报这个错误。
from flask import current_app
celery = make_celery(current_app)
这样的原因我还不知道,可能以后看懂源码就知道了。。。
后记
好了flask+celery就配置完成了。
可能相比django+celery的配置就麻烦了许多,所以flask的学习就是要更多更多的去参考社区的资料。所以Google常备身边。