celery_tasks 异步任务
当我们需要批量的去执行一些接口,如测试平台的批量运行测试用例时,如果是同步任务的话,会等待用例一个个执行完毕才有返回结果。当点击运行后,后台直接返回一条信息,由celery服务来运行用例,这就是异步
celery_tasks 工作流
平台会通过celery提供的方法将我们的操作推到中间件如redis中,celery会启动一个服务用来监控redis,当redis中有数据时,会取数据取出来后运行完毕后,返回结果给redis,redis有结果后 就将数据传给平台
安装celery
1 pip install Celery
注意:
1、在起celery服务之前,需要本地或者远程启动redis服务,可在config.py文件中配置redis地地址
2、每次修改了celery文件,都需要从命令行重新启动celery服务
celery_tasks 目录结构
Django工程目录
config.py ---------------------配置中间件,以redis为例
1 # broker_url = "redis://:password@ip:port/4"
2 # result_backend = "redis://:password@ip:port/4"
3 #result_backend = "redis://ip:port/4"#没有密码
4
5 #本地redis
6 broker_url = "redis://127.0.0.1:6379/4" #读取数据地址
7 result_backend = "redis://127.0.0.1:6379/4" #结果返回数据地址
main.py ------------------启动tasks任务配置
1 from celery import Celery
2 app = Celery('sksystem') #创建异步对象
3
4 # 获取celery的配置信息
5 app.config_from_object('celery_tasks.config')
6 # 提供tasks的路径,自动发现需要运行的task任务
7 app.autodiscover_tasks(['celery_tasks.run'])
8
9 # 启动celery的方法 默认以cpu的核数多进程的方式启动
10 # celery -A 应用路径 worker -l 日志级别
11 # mac同学
12 # celery -A celery_tasks.main worker -l info 普通启动
13 # celery multi start w1 -A celery_tasks.main -l info --logfile=logs/celerylog.log --pidfile=logs/celerypid.pid 后台运行
14 # celery flower -A celery_tasks.main 打开一个web页面启动 需要提前安装下flow 安装命令:pip install flower
15
16 # win同学
17 # pip install eventlet
18 # celery -A celery_tasks.main worker -l info -P eventlet
19 # -- * - **** ---
20 # - ** ---------- [config]
21 # - ** ---------- .> app: sksystem:0x103d0deb8 启动是那个app的任务
22 # - ** ---------- .> transport: redis://10.168.100.21:6379/2 设置的broker的队列是那个
23 # - ** ---------- .> results: redis://10.168.100.21:6379/3 设置backend的存储地址
24 # - *** --- * --- .> concurrency: 4 (prefork) 默认启动的进程数
25 # -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
26 # --- ***** -----
27 # -------------- [queues]
28 # .> celery exchange=celery(direct) key=celery
29
30
31 # 在view视图中只需要导入tasks中写好的任务方法 通过任务方法调用delay()即可
32 # from celery_tasks.run.tasks import run_case
33 # 调用task任务 参数可以在delay中传递,正常调用一样
34 # run_case.delay() #此方法会返回任务task_id 同run.tasks中的 taskname.request.id
tasks.py --------------- 定义task方法
from celery_tasks.main import app
# 在celery中使用logger
from celery.utils.log import get_task_logger
logger = get_task_logger('django_server')
#用例运行
@app.task(name='run_case')
def run_case(case_id,user_id):
task_id = run_case.request.id #通过方法名.request.id 获取task_id
print("task_id",task_id)
print(case_id)
print(user_id)
#集合运行
@app.task(name='run_collection')
def run_collection(collecti_id,user_id):
task_id = run_collection.request.id #获取task_id
print("task_id",task_id)
print(collecti_id)
print(user_id)
Views.py ----------------------通过delay()方法启用celery服务
1 from celery_tasks.run.tasks import run_case 2 3 #固定celery的调用方式 4 #run_case.delay(1,2) #delay()内部传递参数 5 6 class CollectionRunView(View): #运行测试集合为例子 7 def post(self,request): 8 collect_id = request.POST.get('collect_id') 9 user_id = request.POST.get('user_id') 10 for item_id in json.loads(collect_id): 11 #当页面调用异步任务时,会返回一个唯一的任务id 12 task_id = run_collection.delay(item_id,user_id) #启动异步任务,delay方法支持像函数一样传参 13 models.CaseCollection.objects.filter(id=item_id).update(report_batch=task_id,status=3) 14 print("task_id",task_id) 15 return NbResponse()