Celery异步任务队列
目录结构树:
配置文件config.py:
# 设置中间人地址 broker_url = 'redis://127.0.0.1:6379/1'
主main.py:
import sys import os from celery import Celery from flask import Flask from flask_mail import Mail CELERY_DIR = os.path.dirname(os.getcwd()) sys.path.insert(0, CELERY_DIR) import config mail = Mail() app = Flask(__name__) app.config.from_object(config.config.get(os.environ.get('env')) mail.init_app(app) def make_celery(app): # 创建celery对象并设置 celery = Celery(app.import_name) # 加载配置 celery.config_from_object('celery_tasks.config') # celery.conf.update(app.config) TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask # 启动celery worker时自动发现任务 celery.autodiscover_tasks(['celery_tasks.email',]) return celery celery = make_celery(app)
任务函数tasks.py:
from flask_mail import Message import config from celery_tasks.main import celery, mail # 使用装饰器将send_email函数装饰成任务函数 @celery.task(name='send_email') def send_email(to, subject, html_message): msg = Message( subject, sender=config.Config.MAIL_USERNAME, html=html_message, recipients=[to] ) mail.send(msg) if __name__ == '__main__': send_email.delay('xx@xx.com', 'xx', 'xx')
启动命令:
celery worker -A main.celery -l info
发出任务函数:
send_email.delay('xx@xx.com', 'xx', 'xx')