zoukankan      html  css  js  c++  java
  • python做一个http接口测试框架

    目录结构

    project

      

      case#测试用例

        

        suite#测试目录

      

      logs#测试日志

      

      papi#测试类

      

      result#测试结果

      

      setting.py#配置文件

    1、日志类,用于测试时日志记录 

    pyapilog.py
    

      

    # -*-coding:utf-8 -*-
    # !/usr/bin/python
    # __author__ = 'dongjie'
    __data__ = '2015-05-20'

    import datetime
    import logging
    import os

    from yunjiweidian import setting

    logLevel = {
    1 : logging.NOTSET,
    2 : logging.DEBUG,
    3 : logging.INFO,
    4 : logging.WARNING,
    5 : logging.ERROR,
    6 : logging.CRITICAL
    }
    setFile = os.path.join(setting.root_dir, 'setting.ini')
    loggers = {}


    # 定义日志方法,从配置文件读取日志等级,且定义日志输出路径
    def pyapilog(**kwargs):
    global loggers
    log_level = setting.logLevel
    log_path = setting.logFile
    if os.path.exists(log_path):
    log_file = os.path.join(log_path, datetime.datetime.now().strftime('%Y-%m-%d') + '.log')
    else:
    os.mkdir(r'%s' % log_path)
    log_file = os.path.join(log_path, datetime.datetime.now().strftime('%Y-%m-%d') + '.log')
    logger = logging.getLogger()
    logger.setLevel(logLevel[log_level])
    if not logger.handlers:
    # 创建一个handler,用于写入日志文件
    fh = logging.FileHandler(log_file)
    fh.setLevel(logLevel[log_level])
    # 再创建一个handler,用于输出到控制台
    ch = logging.StreamHandler()
    ch.setLevel(logging.ERROR)
    # 定义handler的输出格式
    formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
    fh.setFormatter(formatter)
    ch.setFormatter(formatter)
    # 给logger添加handler
    logger.addHandler(fh)
    logger.addHandler(ch)
    loggers.update(dict(name=logger))
    return logger

    2、http测试类

    httprequest.py
    

      

    # -*-coding:utf-8 -*-
    # !/usr/bin/python
    __author__ = 'dongjie'
    __data__ = '2015-05-20'
    
    from pyapilog import pyapilog
    import requests
    import json
    import urllib
    
    class SendHttpRequest(object):
        def __init__(self, url):
            self.url = url
        # post request
    
        def post(self, value=None):
            params = urllib.urlencode(value)
            try:
                req = requests.post(self.url + "?%s" % params)
            except Exception, err:
                print err
            if req.status_code == 200:
                pyapilog().info(u"发送post请求: %s  服务器返回:  %s" % (req.url, req.status_code))
            else:
                pyapilog().error(u"发送post请求: %s   服务器返回:  %s
     error info: %s " % (req.url, req.status_code, req.text))
            return req.text
    
        def post_json(self, value):
            head = {'content-type': 'application/json'}
            try:
                req = requests.post(self.url, data=json.dumps(value), headers=head)
                print req.url
            except Exception, err:
                print err
            if req.status_code == 200:
                pyapilog().info(u"发送post请求: %s  服务器返回:  %s" % (req.url, req.status_code))
                return req.text
            else:
                pyapilog().error(u"发送post请求: %s   服务器返回:  %s
     error info: %s " % (req.url, req.status_code, req.text))
    
        def get(self, value=None):
            try:
                req = requests.get(self.url, params=value)
            except Exception, err:
                print err
            if req.status_code == 200:
                pyapilog().info(u"发送get请求: %s   服务器返回:  %s" % (req.url, req.status_code))
            else:
                pyapilog().error(u"发送get请求: %s   服务器返回:  %s
     error info: %s " % (req.url, req.status_code, req.text))
            return req.text

    3、数据库操作类

    databasedriver.py
    

      

    # -*-coding:utf-8 -*# !/usr/bin/python
    __author__ = 'dongjie'
    __data__ = '2015-05-21'
    import pymssql
    import MySQLdb
    import setting
    from pyapilog import pyapilog
    
    class sqldriver(object):
        def __init__(self, host, port, user, password, database):
            self.host = host
            self.port = port
            self.user = user
            self.password = password
            self.database = database
    
        # 执行SQLserver查询
        def exec_mssql(self, sql):
            try:
                conn = pymssql.connect(host=self.host,
                                       port=self.port,
                                       user=self.user,
                                       password=self.password,
                                       database=self.database,
                                       charset="utf8")
                cur = conn.cursor()
                if cur:
                    pyapilog().info(u"执行SQL语句%s" % sql)
                    cur.execute(sql)
                    rows = cur.fetchall()
                    if len(rows) == 0:
                        pyapilog().warning(u"没有查询到数据")
                    return rows
                else:
                    pyapilog().error(u"数据库连接不成功")
                conn.close()
            except Exception, e:
                pyapilog().error(e)
    
        # 执行Mysql查询
        def exec_mysql(self, sql):
            try:
                conn = MySQLdb.connect(host=self.host,
                                       port=self.port,
                                       user=self.user,
                                       passwd=self.password,
                                       db=self.database,
                                       )
                cur = conn.cursor()
                if cur:
                    pyapilog().info(u"执行SQL语句%s" % sql)
                    resList = cur.execute(sql)
                    return resList
            except Exception, e:
                pyapilog().error(e)
    
    # 执行sql语句返回结果
    def execsql(sql):
        config = setting.DATABASE
        driver = config.get("ENGINE")
        host = config.get("HOST")
        port = config.get("PORT")
        user = config.get("USER")
        password = config.get("PWD")
        database = config.get("DATABASE")
        if driver == "MYSQL":
            try:
                sql_result = sqldriver(
                    host=host,
                    port=port,
                    user=user,
                    password=password,
                    database=database
                ).exec_mysql(sql)
                return sql_result
            except Exception, e:
                pyapilog().error(e)
    
        elif driver == "MSSQL":
            try:
                sql_result = sqldriver(
                    host=host,
                    port=port,
                    user=user,
                    password=password,
                    database=database
                ).exec_mssql(sql)
                return sql_result
            except Exception, e:
                pyapilog().error(e)
    else:
            pyapilog().error(u"[%s]数据库配置支持MYSQL、MSSQL、ORACLE" % driver)

    4、解析json字符串

    dataprase.py
    

      

    # -*-coding:utf-8 -*-
    # !/usr/bin/python
    __author__ = 'dongjie'
    __data__ = '2015-05-21'
    import json
    import xmltodict
    from pyapilog import pyapilog
    
    # 解析json字符串
    class jsonprase(object):
        def __init__(self, json_value):
            try:
                self.json_value = json.loads(json_value)
            except ValueError, e:
                pyapilog().error(e)
    
        def find_json_node_by_xpath(self, xpath):
            elem = self.json_value
            nodes = xpath.strip("/").split("/")
            for x in range(len(nodes)):
                try:
                    elem = elem.get(nodes[x])
                except AttributeError:
                    elem = [y.get(nodes[x]) for y in elem]
            return elem
    
        def datalength(self, xpath="/"):
            return len(self.find_json_node_by_xpath(xpath))
    
        @property
        def json_to_xml(self):
            try:
                root = {"root": self.json_value}
                xml = xmltodict.unparse(root, pretty=True)
            except ArithmeticError, e:
                pyapilog().error(e)
            return xml
    
    # 解析xml字符串
    class xmlprase(object):
        def __init__(self, xml_value):
            self.xml_str = xml_value
    
        @property
        def xml_to_json(self):
            try:
                xml_dic = xmltodict.parse(self.xml_str,
                                          encoding="utf-8",
                                          process_namespaces=True,
                                          )
                json_str = json.dumps(xml_dic)
            except Exception, e:
                print e
            return json_str
    View Code

    5、还有配置文件差点忘记说了

    setting.py
    

      

    # -*-coding:utf-8 -*-
    # !/usr/bin/python
    __author__ = 'dongjie'
    __data__ = '2015-05-20'
    
    '''
        配置系统相关的参数,提供全局的相关配置
    '''
    import os
    import sys
    root_dir = '/'.join(os.path.realpath(__file__).split('/')[:-1])
    sys.path.append(root_dir)
    # log等级,1:notset 2:debug  3:info 4:warning 5:error 6:critical
    logLevel = 2
    # 日志文件路径
    logFile = os.path.join(root_dir, 'logs')
    
    # 数据库配置,支持MYSQL、MSSQL、ORACLE
    DATABASE = {
        "ENGINE": "MSSQL",
        "HOST": "",
        "PORT": 3433,
        "USER": "",
        "PWD": "",
        "DATABASE": ""
    }
    

      6、最后看看我们的测试用例吧,当然是数据驱动了

    # -*-coding:utf-8 -*-
    from ddt import ddt, data, unpack
    import unittest
    from papi.httprequest import SendHttpRequest
    from papi.dataparse import jsonprase, xmlprase
    
    @ddt
    class TestSingleRequest(unittest.TestCase):
        def setUp(self):
            self.url = "http://xxxxxxxxxxxxxxxxxxx/api/xxxxxx"
        @data(
            (32351, 6),
            (9, 4)
        )
        @unpack
        def test_Single_right(self, sid, count):
            value = {"sid": sid, "count": count}
            data = SendHttpRequest(self.url).get(value)
            json_data = jsonprase(data)
            point_lat = json_data.find_json_node_by_xpath("/Point/Lat")
            point_lng = json_data.find_json_node_by_xpath("/Point/Lng")
            is_exists_map = json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/IsExistsMap")
            size = json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/Size")
            # 断言
            assert float(point_lat) != 0 and float(point_lng) != 0
            # 断言
            assert json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/DownUrl") is not None
            if is_exists_map == True:
                assert size != ""
    
        # 导常请求SingleRequest接口
        @data(
            ("abceeffffg", 6),
            (9, "")
        )
        @unpack
        def test_Single_error(self, sid, count):
            value = {"sid": sid, "count": count}
            data = SendHttpRequest(self.url).get(value)
            self.assertEqual(data, u'{"Message":"请求无效。"}')
    
    @ddt
    class TourMaps(unittest.TestCase):
        def setUp(self):
            self.url = "http://xxxxxx/api/TourMap"
    
        @data(32351, 9)
        def test_requests_online_xml(self, tourId):
            xml_url = self.url + "/%s" % tourId
            data = SendHttpRequest(xml_url).get()
            json_st = xmlprase(data).xml_to_json
            json_data = jsonprase(json_st)
            lng = json_data.find_json_node_by_xpath("/root/data/@lng")
            lat = json_data.find_json_node_by_xpath("/root/data/@lat")
            assert lng != "" and lat != ""
            son_tour = json_data.find_json_node_by_xpath("/root/data/data")
            assert len(son_tour) > 0
    
    class TourData(unittest.TestCase):
        def setUp(self):
            self.url = "http://xxxxxx/api/xxx"
    
        @data(
            (),
            (),
            (),
        )
        @unpack
        def test_tourList_Location_in_open(self):
            pass
    
        def test_tourList_Location_not_open(self):
            pass
    
        def test_tour_open_city(self):
            pass
    
    
    if __name__ == "__main__":
        suite = unittest.TestLoader().loadTestsFromTestCase(TourMaps, TestSingleRequest)
        unittest.TextTestRunner(verbosity=2).run(suite)
    View Code

    关于python ddt查以参考https://ddt.readthedocs.org/en/latest/example.html

    测试结果生成,可以查看python nose相关文档,生成hmtl

  • 相关阅读:
    C# Devexpress GridView获得当前活动行(选中行)的索引
    C# Devexpress GridView获得当前活动行(选中行)的索引
    计算机网络学习总结(超赞!!!)
    计算机网络学习总结(超赞!!!)
    30张图带你彻底理解红黑树
    30张图带你彻底理解红黑树
    DevExpress GridView 添加和设置右键菜单
    DevExpress GridView 添加和设置右键菜单
    数据库分离和附加 (SQL Server)
    数据库分离和附加 (SQL Server)
  • 原文地址:https://www.cnblogs.com/clarke/p/5752574.html
Copyright © 2011-2022 走看看