zoukankan      html  css  js  c++  java
  • Mock挡板测试

    工具一:python3 mock http请求

    1 执行:nohup python httpserver.py > test.log  2>&1 &

    2 mock脚本

    httpserver.py

      1 import sys
      2 import importlib
      3 
      4 importlib.reload(sys)
      5 #sys.setdefaultencoding("utf-8")
      6 
      7 import socketserver
      8 import logging
      9 import os
     10 import platform
     11 import time
     12 # from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
     13 from http.server import BaseHTTPRequestHandler, HTTPServer
     14 
     15 import mock_utils
     16 
     17 logger=logging.getLogger("MockServer")
     18 logger.setLevel(level=logging.INFO)
     19 
     20 # 日志
     21 sysstr=platform.system()
     22 BASE_DIR=os.path.dirname(os.path.join(os.path.dirname(os.path.dirname(__file__)), os.path.pardir))
     23 
     24 if sysstr == "Windows":
     25     logFile="%slogsmockserver.log" % BASE_DIR
     26 elif sysstr == "Linux":
     27     logFile="/home/iotest/smartpotatologs/mockserver.log"
     28 
     29 # 建立一个filehandler来把日志记录在文件里,级别为debug以上
     30 handler=logging.FileHandler(logFile)
     31 
     32 handler.setLevel(logging.INFO)
     33 formatter=logging.Formatter(
     34     '%(asctime)s [%(threadName)s:%(thread)d] [%(name)s:%(lineno)d] [%(module)s:%(funcName)s] [%(levelname)s]- %(message)s')
     35 handler.setFormatter(formatter)
     36 logger.addHandler(handler)
     37 
     38 addr=('0.0.0.0', 8888)
     39 
     40 
     41 class RequestHandler(BaseHTTPRequestHandler):
     42     def _getmockdata(self):
     43         self.mock_method=None
     44         # 1、获取mockurl,查询mock信息
     45         logger.info(str(self.client_address) + " " + str(self.command) + ' >>> 收到Mock请求,访问地址:' + str(self.path))
     46         mockurl=self.path[1:33]
     47         print(mockurl)
     48 
     49         try:
     50             db=mock_utils.create_engine('172.20.201.98', 3306, 'root', 'as123456', 'test')
     51             sql='''SELECT mock_code,mock_header,mock_data,time_out,request_type FROM smartpotato_mock_config WHERE status=1 AND mock_url= "%s"''' % mockurl;
     52 
     53             mockconfig=mock_utils.select_sql(db, sql)
     54             logger.info("查询mockconfig:" + str(mockconfig))
     55             return mockconfig
     56         except Exception as e:
     57             logging.error("查询mockconfig失败:" + str(e))
     58             return None
     59 
     60     def _doresponse(self, mockconfig):
     61         data_type=self.headers['Content-type']  # python3
     62         print(data_type)
     63         if mockconfig:
     64             try:
     65                 time.sleep(mockconfig[3])
     66                 self.send_response(int(mockconfig[0]))
     67                 if mockconfig[1]:
     68                     hd=eval(mock_utils.pre_deal_testdata(mockconfig[1])) # eval() 函数用来执行一个字符串表达式,并返回表达式的值。
     69                     for h in hd:
     70                         self.send_header(h, hd[h])
     71                 self.send_header('Content-type', data_type)
     72                 self.end_headers()
     73                 logger.info("设置Mock返回。")
     74                 try:
     75                     res_data=mock_utils.pre_deal_testdata(mockconfig[2])
     76                 except Exception as e:
     77                     print(e)
     78                 self.wfile.write(res_data.encode('utf-8'))
     79 
     80             except Exception as e:
     81                 logger.error("设置mock返回数据失败:" + str(e))
     82                 self.send_response(500)
     83                 self.send_header('Content-type', data_type)
     84                 self.end_headers()
     85                 self.wfile.write(mock_utils.result_error(data_type, {'result': '00,设置mock返回数据失败。'}).encode('utf-8'))
     86         else:
     87             logger.error("查询无mockconfig,或状态不可用:" + str(mockurl))
     88             self.send_response(404)
     89             self.send_header('Content-type', data_type)
     90             self.end_headers()
     91             self.wfile.write(mock_utils.result_error(data_type, {'result': '01,查询无mockconfig,或状态不可用。'}).encode('utf-8'))
     92 
     93     def do_Head(self):
     94         self._doresponse(self._getmockdata())
     95 
     96     def do_GET(self):
     97         mockconfig=self._getmockdata()
     98         if mockconfig[4] == 'GET':
     99             self._doresponse(mockconfig)
    100         else:
    101             self.send_error(404)
    102 
    103     def do_POST(self):
    104 
    105         mockconfig=self._getmockdata()
    106         #mockconfig ="("returnCode": "1","returnMsg": "查询成功","details": {"dataFileInfo": {"dataFileUrl": "http://10.11.4.124:80/b.csv","fileName": "b.csv"}})"
    107         datas=self.rfile.read(int(self.headers['content-length'])) 
    108         datas.decode()
    109         logger.info("data:"+str(datas))
    110         if mockconfig[4] == 'POST':
    111             self._doresponse(mockconfig)
    112         else:
    113             self.send_error(404)
    114             
    115         '''
    116         print(self.headers)
    117         print(self.command)
    118         req_datas = self.rfile.read(int(self.headers['content-length'])) #重点在此步!
    119         print(req_datas.decode())
    120         data = {
    121             'result_code': '2',
    122             'result_desc': 'Success',
    123             'timestamp': '',
    124             'data': {'message_id': '25d55ad283aa400af464c76d713c07ad'}
    125         }
    126         self.send_response(200)
    127         self.send_header('Content-type', 'application/json')
    128         self.end_headers()
    129         self.wfile.write(json.dumps(data).encode('utf-8'))'''
    130 
    131 # 支持多线程
    132 class ThreadingHttpServer(socketserver.ThreadingMixIn, HTTPServer):
    133     pass
    134 
    135 
    136 try:
    137     server=ThreadingHttpServer(addr, RequestHandler)
    138     print("MockServer已经启动。")
    139     server.serve_forever()
    140 except Exception as e:
    141     logger.error("MockServer异常:", e)
    View Code

    mock_utils.py

      1 #!/usr/bin/python
      2 # -*- coding: UTF-8 -*-
      3 
      4 import sys
      5 import importlib
      6 import logging
      7 
      8 importlib.reload(sys)
      9 # sys.setdefaultencoding("utf-8")
     10 import json
     11 from xml.etree.ElementTree import Element, SubElement, tostring
     12 import random_utils
     13 
     14 
     15 def dict_to_xml(mydict):
     16     objects=Element('xml')  # 根节点
     17     for isbn, info in mydict.items():  # 迭代每一个属性,属性和值(这里的每个值都是另一个对象)
     18         object=SubElement(objects, isbn)  # 创建子节点
     19         try:
     20             for key, val in info.items():
     21                 SubElement(object, key).text=', '.join(str(val).split(':'))
     22         except:
     23             object.text=(', '.join(str(info).split(':')))
     24     return tostring(objects)
     25 
     26 
     27 def dict_to_json(mydict):
     28     try:
     29         return json.dumps(mydict)
     30     except:
     31         return str(mydict)
     32 
     33 
     34 def result_error(str_type, mydict):
     35     if (str(str_type).lower()).find('json') > 0:
     36         return dict_to_json(mydict)
     37     elif str(str_type).lower().find('xml') > 0:
     38         return dict_to_xml(mydict)
     39     else:
     40         return mydict
     41 
     42 
     43 def pre_deal_testdata(data_str):
     44     """处理请求报文,替换请求报文中被参数化掉的参数"""
     45     data=str(data_str)
     46     if '$RANDOM' in data:
     47         data=data.replace("$RANDOM_UUID", random_utils.create_uuid()).replace("$RANDOM_MD5_UUID",
     48                                                                               random_utils.create_uuid_md5()).replace(
     49             "$RANDOM_EMAIL", random_utils.randomEmail()).replace("$RANDOM_STR8",
     50                                                                  random_utils.randomStr(8)).replace(
     51             "$RANDOM_STR16", random_utils.randomStr(16)).replace("$RANDOM_STR32", random_utils.randomStr(32))
     52     if '$TIME' in data:
     53         data=data.replace("$TIME_NOW", random_utils.time_now()).replace("$TIME_TODAY",
     54                                                                         random_utils.time_today()).replace(
     55             "$TIME_FUTURE", random_utils.time_future()).replace("$TIME_NEXT_MONTH",
     56                                                                 random_utils.time_next_month())
     57     if '$DATE' in data:
     58         data=data.replace("$DATE_TODAY", random_utils.date_today()).replace("$DATE_NEXT_MONTH",
     59                                                                             random_utils.date_next_month()).replace(
     60             "$DATE_MONTH_LATER", random_utils.date_month_later())
     61     return data
     62 
     63 
     64 # -*- coding: utf-8 -*-
     65 import pymysql
     66 
     67 
     68 # 打开数据库连接
     69 def create_engine(host, port, user, password, dbname):
     70     db=pymysql.connect(host=host, port=port, user=user, password=password, db=dbname)
     71     return db
     72 
     73 
     74 # 使用cursor()方法获取操作游标
     75 def exc_sql(db, sql):
     76     if db:
     77         cursor=db.cursor()
     78     try:
     79         # 执行sql语句
     80         cursor.execute(sql)
     81         # 提交到数据库执行
     82         db.commit()
     83         return cursor.rowcount
     84     except:
     85         # Rollback in case there is any error
     86         db.rollback()
     87         return 0
     88 
     89 
     90 # 使用cursor()方法获取操作游标
     91 def select_sql(db, sql):
     92     if db:
     93         cursor=db.cursor()
     94     try:
     95         # 执行sql语句
     96         count=cursor.execute(sql)
     97         result=cursor.fetchone()
     98         return result
     99     except Exception as e:
    100         print(e)
    101         # Rollback in case there is any error
    102         db.rollback()
    103         return None
    104 
    105 
    106 def db_close(db):
    107     db.close()
    View Code

    random_utils.py

      1 # coding:utf-8
      2 
      3 import random
      4 import time
      5 import datetime
      6 import uuid,string
      7 import hashlib
      8 
      9 """
     10 产生随机参数,待需求确认需要哪些随机数据
     11 """
     12 
     13 
     14 def create_uuid():
     15     """通过UUID的方式创建"""
     16     return str(uuid.uuid1())
     17 
     18 
     19 def create_uuid_md5():
     20     """通过MD5的方式创建UUID"""
     21     m=hashlib.md5()
     22     m.update(bytes(str(time.time()), encoding = 'utf-8'))
     23     # m.update("小马过河".encode("utf-8"))  update对指定字符串进行加密
     24     return m.hexdigest()
     25 
     26 def create_pwd_md5_complex(arg):
     27     """加盐"""
     28     m = hashlib.md5(bytes('jiayan', encoding= 'utf-8'))
     29     m.update(bytes(arg, encoding = 'utf-8'))
     30     return m.hexdigest()
     31 
     32 def randomStr(i):
     33     """返回随机字符串, 从a-zA-Z0-9生成指定数量的随机字符:"""
     34     return ''.join(random.sample(string.ascii_letters + string.digits, i))
     35 
     36 
     37 def randomEmail():
     38     """返回随机邮箱"""
     39     name=''.join(random.sample(string.ascii_letters + string.digits, 4))+''.join(random.sample(string.ascii_letters + string.digits, 5))
     40     return name+"@iottest.com"
     41 
     42 
     43 def time_now():
     44     """返回当前时间"""
     45     return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
     46 
     47 
     48 def time_today():
     49     """返回今天的00:00:00点"""
     50     return time.strftime("%Y-%m-%d 00:00:00", time.localtime())
     51 
     52 
     53 def time_future():
     54     """返回30天后的00:00:00点"""
     55     day = datetime.datetime.now()
     56     date = day + datetime.timedelta(days=30)
     57     return str(date)[0:10] + " 00:00:00"
     58 
     59 
     60 def time_next_month():
     61     """返回次月1日凌晨00:00:00"""
     62     day_tuple = time.localtime()
     63     day_list = list(day_tuple)
     64     if day_list[1] == 12:
     65         day_list[0] += 1
     66         day_list[1] = 1
     67         day_list[2] = 1
     68     else:
     69         day_list[1] += 1
     70         day_list[2] = 1
     71     return time.strftime("%Y-%m-%d 00:00:00",tuple(day_list))
     72 
     73 
     74 def date_today():
     75     """返回当前日期"""
     76     return time.strftime("%Y-%m-%d", time.localtime())
     77 
     78 
     79 def date_next_month():
     80     """返回次月1号的日期"""
     81     day_tuple = time.localtime()
     82     day_list = list(day_tuple)
     83     if day_list[1] == 12:
     84         day_list[0] += 1
     85         day_list[1] = 1
     86         day_list[2] = 1
     87     else:
     88         day_list[1] += 1
     89         day_list[2] = 1
     90     return time.strftime("%Y-%m-%d", tuple(day_list))
     91 
     92 
     93 def date_month_later():
     94     """返回30天后的日期"""
     95     day = datetime.datetime.now()
     96     date = day + datetime.timedelta(days=30)
     97     return str(date)[0:10]
     98 
     99 
    100 if __name__ == '__main__':
    101     print(create_uuid())
    102     print(create_uuid_md5())
    103     print(create_pwd_md5_complex('test'))
    104     print(randomStr(8))
    105     print(randomStr(16))
    106     print(randomStr(32))
    107     print(randomEmail())
    108     print(time_now())
    109     print(time_today())
    110     print(time_future())
    111     print(time_next_month())
    112     print(date_today())
    113     print(date_next_month())
    114     print(date_month_later())
    View Code

     3 请求示例

    工具二:moco-runner

    1 jar包地址:https://github.com/dreamhead/moco

    2 配置参考:

       https://www.cnblogs.com/wzl0916/p/12678493.html

       https://www.cnblogs.com/bingoTest/p/11353575.html
     
    json配置文件: test.json
     1  
     2 [
     3   {
     4     "description":"moco test03",
     5     "request":{
     6         "uri":"/moco/test01"
     7     },
     8     "response":{
     9         "headers":{
    10             "Content-Type":"text/html;charset=utf8"
    11         },
    12         "text":"The first moco script!moco 第一个脚本。"
    13     }
    14   },
    15   {
    16     "description":"moco test03",
    17     "request":{
    18         "uri":"/moco/test02"
    19     },
    20     "response":{
    21         "headers":{
    22             "Content-Type":"text/html;charset=utf8"
    23         },
    24         "text":"The first moco script!moco第2个脚本。"
    25     }
    26   }
    27 ]
    View Code
    -------------------------------------------------------------------------------------------------
    带参数请求
     1 [  
     2 {
     3     "description":"moco test03",
     4     "request":{
     5         "method":"get",
     6         "uri":"/moco/test01",
     7         "queries":{
     8             "UserName":"wzl",
     9             "Password":"123456"
    10         }
    11     },
    12     "response":{
    13         "headers":{
    14             "Content-Type":"text/html;charset=utf8"
    15         },
    16         "text":"Login success"
    17     }
    18   }
    19 ]
    View Code
    -------------------------------------------------------------------------------------------------------
    响应读取外部文件
     1 [
     2   {
     3     "description":"moco test03",
     4     "request":{
     5         "method":"get",
     6         "uri":"/moco/test01",
     7         "queries":{
     8             "UserName":"wzl",
     9             "Password":"123456"
    10         }
    11     },
    12     "response":{
    13         "headers":{
    14             "Content-Type":"text/html;charset=utf8"
    15         },
    16         "file":"success_result.json"
    17     }
    18   }
    19 ]
    20  
    21 ----success_result.json
    22 {
    23     "status":"200",
    24     "msg":"Login Success",
    25     "data":{
    26         "UserName":"wzl",
    27         "Password":"123456"
    28     }
    29 }
    View Code
    ----------------------------------------------------------------------------------------------------------------------
     
    3 启动 nohup java  -jar  moco-runner-1.1.0-standalone.jar http -p  3001  -c test.json > /dev/null 2>&1 &
     
    4 实例-响应读取外部文件:
    配置
     1 [
     2   {
     3     "description":"moco big.data.list",
     4     "request":{
     5         "uri":"/data_subcribe_query"
     6     },
     7  
     8     "response":{
     9         "headers":{
    10             "Content-Type":"text/html;charset=utf8"
    11         },
    12         "file":"data_subcribe_query.json"
    13     }
    14   },
    15   {
    16     "description":"moco tag.info",
    17     "request":{
    18         "uri":"/data_file_get"
    19     },
    20  
    21     "response":{
    22         "headers":{
    23             "Content-Type":"text/html;charset=utf8"
    24         },
    25         "file":"data_file_get.json"
    26     }
    27   }
    28 ]
    View Code
    启动
    nohup java -jar  ./moco-runner-1.1.0-standalone.jar http -p 8899 -c uts_mock.json  > /dev/null 2>&1 &
  • 相关阅读:
    模糊查询(排除%等通配符并支持不连续关键字查询)
    ideal中运行manven常用操作
    ideal项目启动及问题
    FastJSON 转换List<T> ,Map<T,T>泛型失败 处理方法
    MySQL的SELECT ...for update
    CouchDB客户端开发—Java版
    Spring Data JPA 实例查询
    第一章 计算机网络概述
    第二章 物理层(二)
    Java常考面试题(一)
  • 原文地址:https://www.cnblogs.com/hzr-notes/p/12066289.html
Copyright © 2011-2022 走看看