zoukankan      html  css  js  c++  java
  • python做一个http接口测试框架

    目录结构

    project

      

      case#测试用例

        

        suite#测试目录

      

      logs#测试日志

      

      papi#测试类

      

      result#测试结果

      

      setting.py#配置文件

    1、日志类,用于测试时日志记录 

    pyapilog.py
    

      

    # -*-coding:utf-8 -*-
    # !/usr/bin/python
    # __author__ = 'dongjie'
    __data__ = '2015-05-20'

    import datetime
    import logging
    import os

    from yunjiweidian import setting

    logLevel = {
    1 : logging.NOTSET,
    2 : logging.DEBUG,
    3 : logging.INFO,
    4 : logging.WARNING,
    5 : logging.ERROR,
    6 : logging.CRITICAL
    }
    setFile = os.path.join(setting.root_dir, 'setting.ini')
    loggers = {}


    # 定义日志方法,从配置文件读取日志等级,且定义日志输出路径
    def pyapilog(**kwargs):
    global loggers
    log_level = setting.logLevel
    log_path = setting.logFile
    if os.path.exists(log_path):
    log_file = os.path.join(log_path, datetime.datetime.now().strftime('%Y-%m-%d') + '.log')
    else:
    os.mkdir(r'%s' % log_path)
    log_file = os.path.join(log_path, datetime.datetime.now().strftime('%Y-%m-%d') + '.log')
    logger = logging.getLogger()
    logger.setLevel(logLevel[log_level])
    if not logger.handlers:
    # 创建一个handler,用于写入日志文件
    fh = logging.FileHandler(log_file)
    fh.setLevel(logLevel[log_level])
    # 再创建一个handler,用于输出到控制台
    ch = logging.StreamHandler()
    ch.setLevel(logging.ERROR)
    # 定义handler的输出格式
    formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
    fh.setFormatter(formatter)
    ch.setFormatter(formatter)
    # 给logger添加handler
    logger.addHandler(fh)
    logger.addHandler(ch)
    loggers.update(dict(name=logger))
    return logger

    2、http测试类

    httprequest.py
    

      

    # -*-coding:utf-8 -*-
    # !/usr/bin/python
    __author__ = 'dongjie'
    __data__ = '2015-05-20'
    
    from pyapilog import pyapilog
    import requests
    import json
    import urllib
    
    class SendHttpRequest(object):
        def __init__(self, url):
            self.url = url
        # post request
    
        def post(self, value=None):
            params = urllib.urlencode(value)
            try:
                req = requests.post(self.url + "?%s" % params)
            except Exception, err:
                print err
            if req.status_code == 200:
                pyapilog().info(u"发送post请求: %s  服务器返回:  %s" % (req.url, req.status_code))
            else:
                pyapilog().error(u"发送post请求: %s   服务器返回:  %s
     error info: %s " % (req.url, req.status_code, req.text))
            return req.text
    
        def post_json(self, value):
            head = {'content-type': 'application/json'}
            try:
                req = requests.post(self.url, data=json.dumps(value), headers=head)
                print req.url
            except Exception, err:
                print err
            if req.status_code == 200:
                pyapilog().info(u"发送post请求: %s  服务器返回:  %s" % (req.url, req.status_code))
                return req.text
            else:
                pyapilog().error(u"发送post请求: %s   服务器返回:  %s
     error info: %s " % (req.url, req.status_code, req.text))
    
        def get(self, value=None):
            try:
                req = requests.get(self.url, params=value)
            except Exception, err:
                print err
            if req.status_code == 200:
                pyapilog().info(u"发送get请求: %s   服务器返回:  %s" % (req.url, req.status_code))
            else:
                pyapilog().error(u"发送get请求: %s   服务器返回:  %s
     error info: %s " % (req.url, req.status_code, req.text))
            return req.text

    3、数据库操作类

    databasedriver.py
    

      

    # -*-coding:utf-8 -*# !/usr/bin/python
    __author__ = 'dongjie'
    __data__ = '2015-05-21'
    import pymssql
    import MySQLdb
    import setting
    from pyapilog import pyapilog
    
    class sqldriver(object):
        def __init__(self, host, port, user, password, database):
            self.host = host
            self.port = port
            self.user = user
            self.password = password
            self.database = database
    
        # 执行SQLserver查询
        def exec_mssql(self, sql):
            try:
                conn = pymssql.connect(host=self.host,
                                       port=self.port,
                                       user=self.user,
                                       password=self.password,
                                       database=self.database,
                                       charset="utf8")
                cur = conn.cursor()
                if cur:
                    pyapilog().info(u"执行SQL语句%s" % sql)
                    cur.execute(sql)
                    rows = cur.fetchall()
                    if len(rows) == 0:
                        pyapilog().warning(u"没有查询到数据")
                    return rows
                else:
                    pyapilog().error(u"数据库连接不成功")
                conn.close()
            except Exception, e:
                pyapilog().error(e)
    
        # 执行Mysql查询
        def exec_mysql(self, sql):
            try:
                conn = MySQLdb.connect(host=self.host,
                                       port=self.port,
                                       user=self.user,
                                       passwd=self.password,
                                       db=self.database,
                                       )
                cur = conn.cursor()
                if cur:
                    pyapilog().info(u"执行SQL语句%s" % sql)
                    resList = cur.execute(sql)
                    return resList
            except Exception, e:
                pyapilog().error(e)
    
    # 执行sql语句返回结果
    def execsql(sql):
        config = setting.DATABASE
        driver = config.get("ENGINE")
        host = config.get("HOST")
        port = config.get("PORT")
        user = config.get("USER")
        password = config.get("PWD")
        database = config.get("DATABASE")
        if driver == "MYSQL":
            try:
                sql_result = sqldriver(
                    host=host,
                    port=port,
                    user=user,
                    password=password,
                    database=database
                ).exec_mysql(sql)
                return sql_result
            except Exception, e:
                pyapilog().error(e)
    
        elif driver == "MSSQL":
            try:
                sql_result = sqldriver(
                    host=host,
                    port=port,
                    user=user,
                    password=password,
                    database=database
                ).exec_mssql(sql)
                return sql_result
            except Exception, e:
                pyapilog().error(e)
    else:
            pyapilog().error(u"[%s]数据库配置支持MYSQL、MSSQL、ORACLE" % driver)

    4、解析json字符串

    dataprase.py
    

      

    # -*-coding:utf-8 -*-
    # !/usr/bin/python
    __author__ = 'dongjie'
    __data__ = '2015-05-21'
    import json
    import xmltodict
    from pyapilog import pyapilog
    
    # 解析json字符串
    class jsonprase(object):
        def __init__(self, json_value):
            try:
                self.json_value = json.loads(json_value)
            except ValueError, e:
                pyapilog().error(e)
    
        def find_json_node_by_xpath(self, xpath):
            elem = self.json_value
            nodes = xpath.strip("/").split("/")
            for x in range(len(nodes)):
                try:
                    elem = elem.get(nodes[x])
                except AttributeError:
                    elem = [y.get(nodes[x]) for y in elem]
            return elem
    
        def datalength(self, xpath="/"):
            return len(self.find_json_node_by_xpath(xpath))
    
        @property
        def json_to_xml(self):
            try:
                root = {"root": self.json_value}
                xml = xmltodict.unparse(root, pretty=True)
            except ArithmeticError, e:
                pyapilog().error(e)
            return xml
    
    # 解析xml字符串
    class xmlprase(object):
        def __init__(self, xml_value):
            self.xml_str = xml_value
    
        @property
        def xml_to_json(self):
            try:
                xml_dic = xmltodict.parse(self.xml_str,
                                          encoding="utf-8",
                                          process_namespaces=True,
                                          )
                json_str = json.dumps(xml_dic)
            except Exception, e:
                print e
            return json_str
    View Code

    5、还有配置文件差点忘记说了

    setting.py
    

      

    # -*-coding:utf-8 -*-
    # !/usr/bin/python
    __author__ = 'dongjie'
    __data__ = '2015-05-20'
    
    '''
        配置系统相关的参数,提供全局的相关配置
    '''
    import os
    import sys
    root_dir = '/'.join(os.path.realpath(__file__).split('/')[:-1])
    sys.path.append(root_dir)
    # log等级,1:notset 2:debug  3:info 4:warning 5:error 6:critical
    logLevel = 2
    # 日志文件路径
    logFile = os.path.join(root_dir, 'logs')
    
    # 数据库配置,支持MYSQL、MSSQL、ORACLE
    DATABASE = {
        "ENGINE": "MSSQL",
        "HOST": "",
        "PORT": 3433,
        "USER": "",
        "PWD": "",
        "DATABASE": ""
    }
    

      6、最后看看我们的测试用例吧,当然是数据驱动了

    # -*-coding:utf-8 -*-
    from ddt import ddt, data, unpack
    import unittest
    from papi.httprequest import SendHttpRequest
    from papi.dataparse import jsonprase, xmlprase
    
    @ddt
    class TestSingleRequest(unittest.TestCase):
        def setUp(self):
            self.url = "http://xxxxxxxxxxxxxxxxxxx/api/xxxxxx"
        @data(
            (32351, 6),
            (9, 4)
        )
        @unpack
        def test_Single_right(self, sid, count):
            value = {"sid": sid, "count": count}
            data = SendHttpRequest(self.url).get(value)
            json_data = jsonprase(data)
            point_lat = json_data.find_json_node_by_xpath("/Point/Lat")
            point_lng = json_data.find_json_node_by_xpath("/Point/Lng")
            is_exists_map = json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/IsExistsMap")
            size = json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/Size")
            # 断言
            assert float(point_lat) != 0 and float(point_lng) != 0
            # 断言
            assert json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/DownUrl") is not None
            if is_exists_map == True:
                assert size != ""
    
        # 导常请求SingleRequest接口
        @data(
            ("abceeffffg", 6),
            (9, "")
        )
        @unpack
        def test_Single_error(self, sid, count):
            value = {"sid": sid, "count": count}
            data = SendHttpRequest(self.url).get(value)
            self.assertEqual(data, u'{"Message":"请求无效。"}')
    
    @ddt
    class TourMaps(unittest.TestCase):
        def setUp(self):
            self.url = "http://xxxxxx/api/TourMap"
    
        @data(32351, 9)
        def test_requests_online_xml(self, tourId):
            xml_url = self.url + "/%s" % tourId
            data = SendHttpRequest(xml_url).get()
            json_st = xmlprase(data).xml_to_json
            json_data = jsonprase(json_st)
            lng = json_data.find_json_node_by_xpath("/root/data/@lng")
            lat = json_data.find_json_node_by_xpath("/root/data/@lat")
            assert lng != "" and lat != ""
            son_tour = json_data.find_json_node_by_xpath("/root/data/data")
            assert len(son_tour) > 0
    
    class TourData(unittest.TestCase):
        def setUp(self):
            self.url = "http://xxxxxx/api/xxx"
    
        @data(
            (),
            (),
            (),
        )
        @unpack
        def test_tourList_Location_in_open(self):
            pass
    
        def test_tourList_Location_not_open(self):
            pass
    
        def test_tour_open_city(self):
            pass
    
    
    if __name__ == "__main__":
        suite = unittest.TestLoader().loadTestsFromTestCase(TourMaps, TestSingleRequest)
        unittest.TextTestRunner(verbosity=2).run(suite)
    View Code

    关于python ddt查以参考https://ddt.readthedocs.org/en/latest/example.html

    测试结果生成,可以查看python nose相关文档,生成hmtl

  • 相关阅读:
    mysql 远程登陆不上
    hdu 5339 Untitled【搜索】
    SqlServer 书目
    passwordauthentication yes
    oracle 11g RAC ocfs2
    Oracle 11g RAC database on ASM, ACFS or OCFS2
    CentOS ips bonding
    Oracle 11g RAC features
    openStack 王者归来之 trivial matters
    openstack windows 2008 img
  • 原文地址:https://www.cnblogs.com/clarke/p/5752574.html
Copyright © 2011-2022 走看看