zoukankan      html  css  js  c++  java
  • flask + Python3 实现的的API自动化测试平台---- IAPTest接口测试平台

    转载:http://www.likecs.com/default/index/show?id=603

    **背景: 1.平时测试接口,总是现写代码,对测试用例的管理,以及测试报告的管理持久化做的不够,
                  2.工作中移动端开发和后端开发总是不能并行进行,需要一个mock的依赖来让他们并行开发。
                  3.同时让自己锻炼去开发测试平台,掌握flask开发程序,提高自己的业务水平。

    整体思路:    1.利用flask+bootstrap来进行web界面开发,对接口,接口测试用例,定时任务,测试报告的持续集成。
                               2.IAPTest支持接口用例管理,接口多用例测试,支持定时测试任务,测试报告持久化
                               3.目前mock服务支持单一path,定时任务可以开启暂停多用例执行,定时任务执行后自动发送测试报告,多用例的单次执行,单接口的调试功能。对测试环境的管理
    下面来看下最后的效果图,以及附上github开源地址。

    测试环境管理界面:

     



    定时任务界面:



    mock界面



    测试报告界面



    用例管理界面



    接口管理界面



    **核心代码分享区:**

    定时任务对应视图开发

    复制代码
    class AddtimingtaskView(MethodView):
        @login_required
        def get(self):
            return  render_template('addtimingtasks.html')
        @login_required
        def post(self):
            taskname=request.form['taskname']
            tinmingtime=request.form['time']
            to_email_data=request.form['to_email']
            cao_email=request.form['cao_email']
            weihu=request.form['weihu']
            if taskname =='':
                flash('任务名不能为空!')
                return render_template('addtimingtasks.html')
            if tinmingtime =='':
                flash('任务执行时间不能为空!')
                return render_template('addtimingtasks.html')
            if to_email_data=='':
                flash('发送给谁邮件不能为空!')
                return render_template('addtimingtasks.html')
            if weihu=='':
                flash('维护人邮件不能为空!')
                return render_template('addtimingtasks.html')
            taskname_is = Task.query.filter_by(taskname=taskname).first()
            if taskname_is:
                flash('任务已经存在请重新填写!')
                return render_template('addtimingtasks.html')
            new_task=Task(taskname=taskname,taskstart=tinmingtime,taskrepor_to=to_email_data,taskrepor_cao=cao_email,task_make_email=weihu,
                          makeuser=current_user.id)
            db.session.add(new_task)
            try:
                db.session.commit()
                flash('添加定时任务成功')
                return  redirect(url_for('timingtask'))
            except Exception as e:
                db.session.rollback()
                flash('添加过程貌似异常艰难!')
                return redirect(url_for('addtimingtasks'))
            return render_template('addtimingtasks.html')
    class Editmingtaskview(MethodView):
        @login_required
        def get(self,id):
            task_one=Task.query.filter_by(id=id).first()
            procjet=Project.query.all()
            if not task_one:
                flash('你编辑的不存在')
                return  redirect(url_for('timingtask'))
            return  render_template('Edittimingtasks.html',task_one=task_one,porjects=procjet)
        def post(self,id):
            task_one = Task.query.filter_by(id=id).first()
            procjet = Project.query.all()
            taskname = request.form['taskname']
            tinmingtime = request.form['time']
            to_email_data = request.form['to_email']
            cao_email = request.form['cao_email']
            weihu = request.form['weihu']
            if taskname =='':
                flash('任务名不能为空!')
                return render_template('addtimingtasks.html')
            if tinmingtime =='':
                flash('任务执行时间不能为空!')
                return render_template('addtimingtasks.html')
            if to_email_data=='':
                flash('发送给谁邮件不能为空!')
                return render_template('addtimingtasks.html')
            if weihu=='':
                flash('维护人邮件不能为空!')
                return render_template('addtimingtasks.html')
            task_one.taskname=taskname
            task_one.taskrepor_to=to_email_data
            task_one.taskrepor_cao=cao_email
            task_one.task_make_email=weihu
            task_one.makeuser=current_user.id
            try:
                db.session.commit()
                flash('编辑成功')
                return  redirect(url_for('timingtask'))
            except:
                db.session.rollback()
                flash('编辑出现问题!')
                return redirect(url_for('timingtask'))
            return render_template('Edittimingtasks.html', task_one=task_one,porjects=procjet)
    class DeteleTaskViee(MethodView):
        def get(self,id):
            task_one = Task.query.filter_by(id=id).first()
            if not task_one:
                flash('你编辑的不存在')
                return redirect(url_for('timingtask'))
            if task_one.status==True:
                flash('已经删除')
                return redirect(url_for('timingtask'))
            task_one.status=True
            try:
                db.session.commit()
                flash('删除任务成功')
                return redirect(url_for('timingtask'))
            except:
                db.session.rollback()
                flash('删除任务休息了')
                return redirect(url_for('timingtask'))
    @app.route('/gettest',methods=['POST'])
    @login_required
    def gettest():#ajax获取项目的测试用例
        projec=(request.get_data('project')).decode('utf-8')
        if not projec:
            return []
        proje=Project.query.filter_by(project_name=str(projec)).first()
        if not proje:
            return  []
        testyong=InterfaceTest.query.filter_by(projects_id=proje.id).all()
        testyong_list=[]
        for i in testyong:
            testyong_list.append({'name':i.Interface_name,'id':i.id})
        return   jsonify({'data':testyong_list})
    class TestforTaskView(MethodView):#为测试任务添加测试用例
        def get(self,id):
            procjet = Project.query.all()
            task_one=Task.query.filter_by(id=id).first()
            return  render_template('addtestyongfortask.html',task_one=task_one,procjets=procjet)
        def post(self,id):
            procjet = Project.query.all()
            task_one = Task.query.filter_by(id=id).first()
            proc_test=request.form.get('project')
            if proc_test =='':
                flash(u'不能不添加测试项目!')
                return render_template('addtestyongfortask.html', task_one=task_one, procjets=procjet)
            test_yongli=request.form.getlist('testyongli')
            if test_yongli=='':
                flash(u'亲你见过只有测试项目没有测试用例的测试任务吗!')
                return render_template('addtestyongfortask.html', task_one=task_one, procjets=procjet)
            for oldtask in task_one.interface.all():
                task_one.interface.remove(oldtask)
            task_one.prject=Project.query.filter_by(project_name=proc_test).first().id
            for yongli in test_yongli:
                task_one.interface.append(InterfaceTest.query.filter_by(id=yongli).first())
                db.session.add(task_one)
            try:
                db.session.commit()
                flash('任务更新用例成功')
                return  redirect(url_for('timingtask'))
            except:
                flash('任务更新用例失败')
                return redirect(url_for('timingtask'))
            return render_template('addtestyongfortask.html', task_one=task_one, procjets=procjet)
    class StartTaskView(MethodView):#开始定时任务
        @login_required
        def get(self,id):
            task=Task.query.filter_by(id=id).first()
            if len(task.interface.all())<=1:
                flash('定时任务执行过程的测试用例为多用例,请你谅解')
                return  redirect(url_for('timingtask'))
            try:
                scheduler.add_job(func=addtask, id=str(id), args=str(id),trigger=eval(task.taskstart),replace_existing=True)
                task.yunxing_status='启动'
                db.session.commit()
                flash(u'定时任务启动成功!')
                return  redirect(url_for('timingtask'))
            except Exception as e:
                flash(u'定时任务启动失败!请检查任务的各项内容各项内容是否正常')
                return redirect(url_for('timingtask'))
    class ZantingtaskView(MethodView):#暂停定时任务
        @login_required
        def get(self,id):
            task = Task.query.filter_by(id=id).first()
            try:
                scheduler.pause_job(str(id))
                task.yunxing_status = '暂停'
                db.session.commit()
                flash(u'定时任务暂停成功!')
                return redirect(url_for('timingtask'))
            except:
                task.yunxing_status = '创建'
                db.session.commit()
                flash(u'定时任务暂停失败!已经为您初始化')
                return redirect(url_for('timingtask'))
    class HuifutaskView(MethodView):#回复定时任务
        @login_required
        def get(self,id):
            task = Task.query.filter_by(id=id).first()
            try:
                scheduler.resume_job(str(id))
                task.yunxing_status='启动'
                db.session.commit()
                flash(u'定时任务恢复成功!')
                return redirect(url_for('timingtask'))
            except:
                task.yunxing_status = '创建'
                db.session.commit()
                flash(u'定时任务恢复失败!已经为您初始化')
                return redirect(url_for('timingtask'))
    class YichuTaskView(MethodView):#移除定时任务
        @login_required
        def get(self,id):
            task = Task.query.filter_by(id=id).first()
            try:
                scheduler.delete_job(str(id))
                task.yunxing_status='关闭'
                db.session.commit()
                flash(u'定时任务移除成功!')
                return redirect(url_for('timingtask'))
            except:
                task.yunxing_status = '创建'
                db.session.commit()
                flash(u'定时任务移除失败!已经为您初始化')
                return redirect(url_for('timingtask'))
    复制代码


    定时任务所执行的func代码

    复制代码
    def addtask(id):#定时任务执行的时候所用的函数
        in_id=int(id)
        task=Task.query.filter_by(id=in_id).first()
        starttime = datetime.datetime.now()
        star = time.time()
        day = time.strftime("%Y%m%d%H%M", time.localtime(time.time()))
        basedir = os.path.abspath(os.path.dirname(__file__))
        file_dir = os.path.join(basedir, 'upload')
        file = os.path.join(file_dir, (day + '.log'))
        if os.path.exists(file) is False:
            os.system('touch %s' % file)
        filepath = os.path.join(file_dir, (day + '.html'))
        if os.path.exists(filepath) is False:
            os.system(r'touch %s' % filepath)
        projecct_list = []
        model_list = []
        Interface_name_list = []
        Interface_url_list = []
        Interface_meth_list = []
        Interface_pase_list = []
        Interface_assert_list = []
        Interface_headers_list = []
        id_list = []
        for task_yongli in task.interface.all():
            id_list.append(task_yongli.id)
            projecct_list.append(task_yongli.projects)
            model_list.append(task_yongli.models)
            Interface_url_list.append(task_yongli.Interface_url)
            Interface_name_list.append(task_yongli.Interface_name)
            Interface_meth_list.append(task_yongli.Interface_meth)
            Interface_pase_list.append(task_yongli.Interface_pase)
            Interface_assert_list.append(task_yongli.Interface_assert)
            Interface_headers_list.append(task_yongli.Interface_headers)
        apitest = ApiTestCase(Interface_url_list, Interface_meth_list, Interface_pase_list, Interface_assert_list, file,
                              Interface_headers_list)
        result_toal, result_pass, result_fail, relusts, bask_list = apitest.testapi()
        endtime = datetime.datetime.now()
        end = time.time()
        createHtml(titles=u'接口测试报告', filepath=filepath, starttime=starttime, endtime=endtime, passge=result_pass,
                   fail=result_fail, id=id_list, name=projecct_list, headers=Interface_headers_list,
                   coneent=Interface_url_list, url=Interface_meth_list, meth=Interface_pase_list,
                   yuqi=Interface_assert_list, json=bask_list, relusts=relusts)
        hour = end - star
        user_id = User.query.filter_by(role_id=2).first().id
        new_reust = TestResult(Test_user_id=user_id, test_num=result_toal, pass_num=result_pass, fail_num=result_fail,
                               test_time=starttime, hour_time=hour, test_rep=(day + '.html'), test_log=(day + '.log'))
        email = EmailReport.query.filter_by(role_id=2, default_set=True).first()
        send_emails(sender=email.send_email, receivers=task.taskrepor_to, password=email.send_email_password,
                    smtp=email.stmp_email, port=email.port, fujian1=file, fujian2=filepath, subject=u'%s自动用例执行测试报告' % day,
                    url='%stest_rep'%(request.url_root))
        db.session.add(new_reust)
        db.session.commit()
    复制代码






    mock服务的一个请求方式的代码

    复制代码
     1 class MakemockserverView(MethodView):#做一个mock服务
     2     def get(self,path):#get请求方法
     3         huoqupath=Mockserver.query.filter_by(path=path,status=True).first()
     4         heders=request.headers
     5         method=request.method
     6         if not huoqupath:
     7             abort(404)
     8         if method.lower() !=huoqupath.methods:
     9             return  jsonify({'code':'-1','message':'请求方式错误!','data':''})
    10         if huoqupath.is_headers==True:
    11             if comp_dict(heders,huoqupath.headers) ==True:
    12                 if huoqupath.ischeck==True:
    13                     paerm = request.values.to_dict()
    14                     if dict_par(paerm,huoqupath.params)==True:
    15                         if huoqupath.rebacktype == 'json':
    16                             try:
    17                                 json_fan = json.dumps(huoqupath.fanhui)
    18                                 return jsonify({'code': '1', 'message': 'successs', 'data': json_fan})
    19                             except:
    20                                 return jsonify({'code': '-2', 'message': '你写入的返回不能正常json!请检查', 'data': ''})
    21                         elif huoqupath.rebacktype == 'xml':
    22                             response = make_response(huoqupath.fanhui)
    23                             response.content_type = 'application/xml'
    24                             return response
    25                         else:
    26                             return jsonify({'code': '-2', 'message': '你写入的类型目前系统不支持', 'data': ''})
    27                     else:
    28                         return jsonify({'code': '-4', 'message': '你输入的参数不正确', 'data': ''})
    29                 else:
    30                     if huoqupath.rebacktype=='json':
    31                         try:
    32                             json_fan=json.dumps(huoqupath.fanhui)
    33                             return  jsonify({'code': '1', 'message': 'successs', 'data':json_fan})
    34                         except:
    35                             return jsonify({'code': '-2', 'message': '你写入的返回不能正常json!请检查', 'data': ''})
    36                     elif huoqupath.rebacktype =='xml':
    37                         response=make_response(huoqupath.fanhui)
    38                         response.content_type='application/xml'
    39                         return response
    40                     else:
    41                         return jsonify({'code': '-2', 'message': '你写入的类型目前系统不支持', 'data': ''})
    42             else:
    43                 return jsonify({'code': '-3', 'message': '安全校验失败!', 'data': ''})
    44         if huoqupath.ischeck == True:
    45             paerm = request.values.to_dict()
    46             if dict_par(paerm, huoqupath.params) == True:
    47                 if huoqupath.rebacktype == 'json':
    48                     try:
    49                         json_fan = json.dumps(huoqupath.fanhui)
    50                         return jsonify({'code': '1', 'message': 'successs', 'data': json_fan})
    51                     except:
    52                         return jsonify({'code': '-2', 'message': '你写入的返回不能正常json!请检查', 'data': ''})
    53                 elif huoqupath.rebacktype == 'xml':
    54                     response = make_response(huoqupath.fanhui)
    55                     response.content_type = 'application/xml'
    56                     return response
    57                 else:
    58                     return jsonify({'code': '-2', 'message': '你写入的类型目前系统不支持', 'data': ''})
    59             else:
    60                 return jsonify({'code': '-4', 'message': '你输入的参数不正确', 'data': ''})
    61         else:
    62             if huoqupath.rebacktype == 'json':
    63                 try:
    64                     json_fan = json.dumps(huoqupath.fanhui)
    65                     return jsonify({'code': '1', 'message': 'successs', 'data': json_fan})
    66                 except:
    67                     return jsonify({'code': '-2', 'message': '你写入的返回不能正常json!请检查', 'data': ''})
    68             elif huoqupath.rebacktype == 'xml':
    69                 response = make_response(huoqupath.fanhui)
    70                 response.content_type = 'application/xml'
    71                 return response
    72             else:
    73                 return jsonify({'code': '-2', 'message': '你写入的类型目前系统不支持', 'data': ''}) #
    复制代码





    开源地址:https://github.com/liwanlei/FXTest

    使用说明:

    1.依赖包为requirements.txt文件下的,可能部分不全,需要什么可以自己使用中增加。
    2.目前由于考虑后续迁移内部使用的话,所以移除了注册功能,改为管理员后台添加方式,默认登录:liwanlei  密码:liwanlei
    3.部分功能调试还存在一定的问题,欢迎各位多提宝贵意见,


     部分功能欠缺:

    1.定时任务的持久化,现在处理容易受到运行过程中的宕机等情况重新启动服务器的定时任务全部需要开启
    2、mock接口只能支持单一的path
    3. 测试环境没有改为动态配置,动态支持。目前测试环境管理以及上线
    4。部分地方可能还会有不严谨性,但是工具可以使用。
    5、目前仅支持 http请求中的json格式的,
    6.大家可以多提意见,后续会优化,最近一直熬夜加班可能有些消息不能及时回复,还望谅解

  • 相关阅读:
    实验4 IIC通讯与EEPROM接口
    实验3 串口通信
    实验2 中断和定时计数器实验
    实验1 单片机IO口应用及数码管显示
    央行大小额支付系统
    银行各交易渠道的清算方式
    ATM跨行取款的清算方式
    POS机刷卡跨行交易的清算方式
    商业银行在CNAPS体系中对各种交易的处理
    支付相关名词解释
  • 原文地址:https://www.cnblogs.com/ceshi2016/p/9141357.html
Copyright © 2011-2022 走看看