zoukankan      html  css  js  c++  java
  • python做一个http接口测试框架

    目录结构

    project

      |

      case#测试用例

        |

        suite#测试目录

      |

      logs#测试日志

      |

      papi#测试类

      |

      result#测试结果

      |

      setting.py#配置文件

    1、日志类,用于测试时日志记录 

    pyapilog.py
     1 # -*-coding:utf-8 -*-
     2 # !/usr/bin/python
     3 __author__ = 'dongjie'
     4 __data__ = '2015-05-20'
     5 
     6 import logging
     7 import datetime
     8 import os
     9 import setting
    10 logLevel = {
    11    1 : logging.NOTSET,
    12    2 : logging.DEBUG,
    13    3 : logging.INFO,
    14    4 : logging.WARNING,
    15    5 : logging.ERROR,
    16    6 : logging.CRITICAL
    17 }
    18 setFile = os.path.join(setting.root_dir, 'setting.ini')
    19 loggers = {}
    20 
    21 
    22 #  定义日志方法,从配置文件读取日志等级,且定义日志输出路径
    23 def pyapilog(**kwargs):
    24     global loggers
    25     log_level = setting.logLevel
    26     log_path = setting.logFile
    27     if os.path.exists(log_path):
    28         log_file = os.path.join(log_path, datetime.datetime.now().strftime('%Y-%m-%d') + '.log')
    29     else:
    30         os.mkdir(r'%s' % log_path)
    31         log_file = os.path.join(log_path, datetime.datetime.now().strftime('%Y-%m-%d') + '.log')
    32     logger = logging.getLogger()
    33     logger.setLevel(logLevel[log_level])
    34     if not logger.handlers:
    35         # 创建一个handler,用于写入日志文件
    36         fh = logging.FileHandler(log_file)
    37         fh.setLevel(logLevel[log_level])
    38         # 再创建一个handler,用于输出到控制台
    39         ch = logging.StreamHandler()
    40         ch.setLevel(logging.ERROR)
    41         # 定义handler的输出格式
    42         formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
    43         fh.setFormatter(formatter)
    44         ch.setFormatter(formatter)
    45         # 给logger添加handler
    46         logger.addHandler(fh)
    47         logger.addHandler(ch)
    48         loggers.update(dict(name=logger))
    49     return  logger

    2、http测试类

    httprequest.py
    # -*-coding:utf-8 -*-
    # !/usr/bin/python
    __author__ = 'dongjie'
    __data__ = '2015-05-20'
    
    from pyapilog import pyapilog
    import requests
    import json
    import urllib
    
    class SendHttpRequest(object):
        def __init__(self, url):
            self.url = url
        # post request
    
        def post(self, value=None):
            params = urllib.urlencode(value)
            try:
                req = requests.post(self.url + "?%s" % params)
            except Exception, err:
                print err
            if req.status_code == 200:
                pyapilog().info(u"发送post请求: %s  服务器返回:  %s" % (req.url, req.status_code))
            else:
                pyapilog().error(u"发送post请求: %s   服务器返回:  %s
     error info: %s " % (req.url, req.status_code, req.text))
            return req.text
    
        def post_json(self, value):
            head = {'content-type': 'application/json'}
            try:
                req = requests.post(self.url, data=json.dumps(value), headers=head)
                print req.url
            except Exception, err:
                print err
            if req.status_code == 200:
                pyapilog().info(u"发送post请求: %s  服务器返回:  %s" % (req.url, req.status_code))
                return req.text
            else:
                pyapilog().error(u"发送post请求: %s   服务器返回:  %s
     error info: %s " % (req.url, req.status_code, req.text))
    
        def get(self, value=None):
            try:
                req = requests.get(self.url, params=value)
            except Exception, err:
                print err
            if req.status_code == 200:
                pyapilog().info(u"发送get请求: %s   服务器返回:  %s" % (req.url, req.status_code))
            else:
                pyapilog().error(u"发送get请求: %s   服务器返回:  %s
     error info: %s " % (req.url, req.status_code, req.text))
            return req.text

    3、数据库操作类

    databasedriver.py# -*-coding:utf-8 -*# !/usr/bin/python
    __author__ = 'dongjie'
    __data__ = '2015-05-21'
    import pymssql
    import MySQLdb
    import setting
    from pyapilog import pyapilog
    
    class sqldriver(object):
        def __init__(self, host, port, user, password, database):
            self.host = host
            self.port = port
            self.user = user
            self.password = password
            self.database = database
    
        # 执行SQLserver查询
        def exec_mssql(self, sql):
            try:
                conn = pymssql.connect(host=self.host,
                                       port=self.port,
                                       user=self.user,
                                       password=self.password,
                                       database=self.database,
                                       charset="utf8")
                cur = conn.cursor()
                if cur:
                    pyapilog().info(u"执行SQL语句|%s|" % sql)
                    cur.execute(sql)
                    rows = cur.fetchall()
                    if len(rows) == 0:
                        pyapilog().warning(u"没有查询到数据")
                    return rows
                else:
                    pyapilog().error(u"数据库连接不成功")
                conn.close()
            except Exception, e:
                pyapilog().error(e)
    
        # 执行Mysql查询
        def exec_mysql(self, sql):
            try:
                conn = MySQLdb.connect(host=self.host,
                                       port=self.port,
                                       user=self.user,
                                       passwd=self.password,
                                       db=self.database,
                                       )
                cur = conn.cursor()
                if cur:
                    pyapilog().info(u"执行SQL语句|%s|" % sql)
                    resList = cur.execute(sql)
                    return resList
            except Exception, e:
                pyapilog().error(e)
    
    # 执行sql语句返回结果
    def execsql(sql):
        config = setting.DATABASE
        driver = config.get("ENGINE")
        host = config.get("HOST")
        port = config.get("PORT")
        user = config.get("USER")
        password = config.get("PWD")
        database = config.get("DATABASE")
        if driver == "MYSQL":
            try:
                sql_result = sqldriver(
                    host=host,
                    port=port,
                    user=user,
                    password=password,
                    database=database
                ).exec_mysql(sql)
                return sql_result
            except Exception, e:
                pyapilog().error(e)
    
        elif driver == "MSSQL":
            try:
                sql_result = sqldriver(
                    host=host,
                    port=port,
                    user=user,
                    password=password,
                    database=database
                ).exec_mssql(sql)
                return sql_result
            except Exception, e:
                pyapilog().error(e)
    else: pyapilog().error(u"[%s]数据库配置支持MYSQL、MSSQL、ORACLE" % driver)

    4、解析json字符串

    dataprase.py
    
    # -*-coding:utf-8 -*-
    # !/usr/bin/python
    __author__ = 'dongjie'
    __data__ = '2015-05-21'
    import json
    import xmltodict
    from pyapilog import pyapilog
    
    # 解析json字符串
    class jsonprase(object):
        def __init__(self, json_value):
            try:
                self.json_value = json.loads(json_value)
            except ValueError, e:
                pyapilog().error(e)
    
        def find_json_node_by_xpath(self, xpath):
            elem = self.json_value
            nodes = xpath.strip("/").split("/")
            for x in range(len(nodes)):
                try:
                    elem = elem.get(nodes[x])
                except AttributeError:
                    elem = [y.get(nodes[x]) for y in elem]
            return elem
    
        def datalength(self, xpath="/"):
            return len(self.find_json_node_by_xpath(xpath))
    
        @property
        def json_to_xml(self):
            try:
                root = {"root": self.json_value}
                xml = xmltodict.unparse(root, pretty=True)
            except ArithmeticError, e:
                pyapilog().error(e)
            return xml
    
    # 解析xml字符串
    class xmlprase(object):
        def __init__(self, xml_value):
            self.xml_str = xml_value
    
        @property
        def xml_to_json(self):
            try:
                xml_dic = xmltodict.parse(self.xml_str,
                                          encoding="utf-8",
                                          process_namespaces=True,
                                          )
                json_str = json.dumps(xml_dic)
            except Exception, e:
                print e
            return json_str

    5、还有配置文件差点忘记说了

    # -*-coding:utf-8 -*-
    # !/usr/bin/python
    __author__ = 'dongjie'
    __data__ = '2015-05-20'
    
    '''
        配置系统相关的参数,提供全局的相关配置
    '''
    import os
    import sys
    root_dir = '/'.join(os.path.realpath(__file__).split('/')[:-1])
    sys.path.append(root_dir)
    # log等级,1:notset 2:debug  3:info 4:warning 5:error 6:critical
    logLevel = 2
    # 日志文件路径
    logFile = os.path.join(root_dir, 'logs')
    
    # 数据库配置,支持MYSQL、MSSQL、ORACLE
    DATABASE = {
        "ENGINE": "MSSQL",
        "HOST": "",
        "PORT": 3433,
        "USER": "",
        "PWD": "",
        "DATABASE": ""
    }

    6、最后看看我们的测试用例吧,当然是数据驱动了

    # -*-coding:utf-8 -*-
    from ddt import ddt, data, unpack
    import unittest
    from papi.httprequest import SendHttpRequest
    from papi.dataparse import jsonprase, xmlprase
    
    @ddt
    class TestSingleRequest(unittest.TestCase):
        def setUp(self):
            self.url = "http://xxxxxxxxxxxxxxxxxxx/api/xxxxxx"
        @data(
            (32351, 6),
            (9555, 4)
        )
        @unpack
        def test_Single_right(self, sid, count):
            value = {"sid": sid, "count": count}
            data = SendHttpRequest(self.url).get(value)
            json_data = jsonprase(data)
            point_lat = json_data.find_json_node_by_xpath("/Point/Lat")
            point_lng = json_data.find_json_node_by_xpath("/Point/Lng")
            is_exists_map = json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/IsExistsMap")
            size = json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/Size")
            # 断言
            assert float(point_lat) != 0 and float(point_lng) != 0
            # 断言
            assert json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/DownUrl") is not None
            if is_exists_map == True:
                assert size != ""
    
        # 导常请求SingleRequest接口
        @data(
            ("abceeffffg", 6),
            (9555, "")
        )
        @unpack
        def test_Single_error(self, sid, count):
            value = {"sid": sid, "count": count}
            data = SendHttpRequest(self.url).get(value)
            self.assertEqual(data, u'{"Message":"请求无效。"}')
    
    @ddt
    class TourMaps(unittest.TestCase):
        def setUp(self):
            self.url = "http://xxxxxx/api/TourMap"
    
        @data(32351, 9555)
        def test_requests_online_xml(self, tourId):
            xml_url = self.url + "/%s" % tourId
            data = SendHttpRequest(xml_url).get()
            json_st = xmlprase(data).xml_to_json
            json_data = jsonprase(json_st)
            lng = json_data.find_json_node_by_xpath("/root/data/@lng")
            lat = json_data.find_json_node_by_xpath("/root/data/@lat")
            assert lng != "" and lat != ""
            son_tour = json_data.find_json_node_by_xpath("/root/data/data")
            assert len(son_tour) > 0
    
    class TourData(unittest.TestCase):
        def setUp(self):
            self.url = "http://xxxxxx/api/xxx"
    
        @data(
            (),
            (),
            (),
        )
        @unpack
        def test_tourList_Location_in_open(self):
            pass
    
        def test_tourList_Location_not_open(self):
            pass
    
        def test_tour_open_city(self):
            pass
    
    
    if __name__ == "__main__":
        suite = unittest.TestLoader().loadTestsFromTestCase(TourMaps, TestSingleRequest)
        unittest.TextTestRunner(verbosity=2).run(suite)

    关于python ddt查以参考https://ddt.readthedocs.org/en/latest/example.html

    测试结果生成,可以查看python nose相关文档,生成hmtl

    本文主要提供了代码,如果相关疑问可联系本人,邮箱d1988505j@163.com

  • 相关阅读:
    Json对象和字符串互转
    JSNOP调用。。。
    org.hibernate.LazyInitializationException: could not initialize proxy no Session
    myeclipse生成注解实体
    jquery判断浏览器和版本
    JSTL XML标签库
    ORACLE 月份不全,补全月份的sql
    js 上下左右键控制焦点
    google gson使用
    js判断undefined类型
  • 原文地址:https://www.cnblogs.com/donjor/p/4632317.html
Copyright © 2011-2022 走看看