摘要:
1.场景描述
2.flask介绍
3.celery介绍
4.项目伪代码记录
5.几个备注点
内容:
1.场景描述
最近在优化用户画像的东西,要开发一个给文本打标签的服务;我这边需要提供一个HTTP的异步回调接口,具体来说就是客户端请求我之后,我判断请求体有没有问题,如果没有返回200状态吗;之后开始我的具体计算逻辑,客户端不用关心这中间要消耗多长时间,当我计算完成之后通过调用另一个HTTP接口,把计算结果返还客户端。
2.flask介绍
这个最近才接触,所以不敢妄自总结。所以还是搬来官网文档:http://docs.jinkan.org/docs/flask/
3.celery介绍
这个是我解决异步计算的分布式任务队列,其中用到了redis(这是之前总结的一些文档redis总结1,redis总结2)做消息中间件,具体的介绍还是引用官网:http://docs.jinkan.org/docs/celery/
4.项目伪代码记录
1 import json 2 import logging 3 from logging.handlers import TimedRotatingFileHandler 4 from time import sleep 5 6 from celery import Celery 7 from flask import Flask 8 from flask import request 9 import requests10 11 ##Flask configure 12 app = Flask(__name__) 13 14 ##celery configure 15 app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0' 16 app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0' 17 celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) 18 celery.conf.update(app.config) 19 20
21 @celery.task
22 def my_background_task(args1,args2):
23 sleep(50)#这里是具体的处理逻辑,使用sleep代替
24 headers = {'content-type': 'application/json;charset=UTF-8'}
25 requests.post(url=reURL, data=data=json.dumps({'somedata':'xxxxx'}).encode('utf-8'),headers=headers)
26 27 @app.route('/get_tags') 28 def server_desc(): 29 param = json.loads(request.data) 30 if param: 31 return json.dump({'status':200}) 32 return json.dump({'status':400}) 33 34 35 if __name__ == '__main__': 36 app.run(host='0.0.0.0', port=4020)
5.几个备注点
1.服务启动顺序:redis-server,celery,flask
2.celery版本,我用的3.1.24,之前用的4.x版本报错not enough values to unpack,后来发现了这个issue:https://github.com/celery/celery/issues/4178
3.关于报错
worker accepts messages serialized with pickle is a very bad idea!
If you really want to continue then you have to set the C_FORCE_ROOT
environment variable (but please think about this before you do).
解决方案:
export C_FORCE_ROOT="true"