最近的项目来了一个需求,要求测试tcp网关通讯协议;
1、液压井盖通过TCP/IP TCP与平台通信;
2、硬件定期发送心跳包(10S)给平台,是平台与硬件保持长连接;
3、每台硬件有一个12字节的唯一编码(字符型);
4、每台设备是1S发送一条报文;
最初使用NetAssist测试功能,模拟硬件设备发送报文,测试硬件设备发过来的状态。
功能测试通过后,新来的压测需求:要求对模拟60个左右的设备每隔一秒发送一条报文到平台,去百度Google搜索TCP压测怎么压测,这类文章博客比较少,试了有个博客自己写的一个软件,发现不能满足需求(都不能安装),其余就是jmeter进行压测,所以就尝试用jmeter进行测试,版本是3.2,按照博客所写进行操作,发现还是不能满足需求,并不能进行60个设备同时1S发送一条报文(加入定时器也不能满足),单个的模拟一个设备可以发送成功。
由于使用工具不能满足需求,只能依靠手写代码来实现了。
common.py 存放公用方法,如进制转化、异或运算等;
manholecover_state.py 结合数据组装报文;
run_test.py 主方法,发送报文,执行脚本;
test.log 日志文件
common.py
# coding=utf8 # import mysql.connector import binascii # 报文头 def reverseStart_field(): return "F5F5" # 返回序列 def reverseNumber_field(): return "0001" # 补全字节 def zfill(str1, i): str1 = str1.zfill(2 * i) # 补全 字节 return str1 # int转16进制 def intChange16(str1, i): str1 = hex(int(str1)) # 转16进制 str1 = str1.replace("0x", "") # 去掉16进制0x str1 = str1.zfill(2 * i) # 补全 字节 return str1 # 16进制 去掉0x 补全 字节 def strReplace(str1, i): str1 = str1.replace("0x", "") # 去掉16进制0x str1 = str1.zfill(2 * i) # 补全 字节 return str1 # int转16进制 倒序 def change16Reverse(str1, i): # print("==========调用方法change16Reverse") str1 = hex(int(str1)) # 转16进制 str1 = str1.replace("0x", "") # 去掉16进制0x str1 = str1.zfill(2 * i) # 补全 字节 # print("==========倒序之前:",str1) i = len(str1) str2 = "" while (i > 0): str2 = str2 + (str1[i - 2:i]) i = i - 2 # print("==========倒序之后:",str2) return str2 # 时间转16进制 def timeChange16(str1): arrTime = [] arrTime = str1.split('-') arrChange = [] strChange = "" i = 0 while (i < len(arrTime)): if (i == 0): print("----------") strChange = change16Reverse(arrTime[i], 2) else: strTem2 = intChange16(arrTime[i], 1) strChange = strChange + strTem2 i = i + 1 return strChange + "FF" # 字符串 转换 def changestr(str2): length = len(str2) utf8_length = len(str2.encode('utf-8')) # gbk 8.5 utf-8 9 length = int((utf8_length - length) / 2 + length) str2 = binascii.b2a_hex(str2.encode("gbk")) # encode("gbk") utf8 str2 = str(str2) # 将 byte 类型转换成 str 类型 str2 = str2.replace("b'", "").replace("'", "") # 去掉 b' str2 = str2.zfill(2 * length) return str2 # asc 转换 def changeascii(str1, aa): e = 0 # 暂存结果 for i in str1: d = ord(i) # 单个字符转换成ASCii码 e = e * 256 + d # 将单个字符转换成的ASCii码相连 a = hex(e) a = a.replace("0x", "") a = a.zfill(2 * aa) # print("结果是:%s" % a) return a if __name__ == "__main__": message_id = '0x10' case_id = "1"
manholecover_state.py
# -*-coding:utf-8-*- # coding=utf8 import mysql.connector import common import logging import datetime import pymysql def parseadd(case): config = { 'host': '127.0.0.1', # 连接的IP地址 'user': 'root', 'password': '123456', 'port': 3306, 'database': 'monitor', 'charset': 'utf8', # 编码格式,防止查出来的数据中文乱码 } #db1_cursor = mysql.connector.Connect(**config) # 连接数据库 db1_cursor = pymysql.Connect(**config) cur = db1_cursor.cursor() # 执行命令,接收结果 t_str = "select * from monitor.test_manholecover_state_copy where case_id=" try: cur.execute(t_str + str(case)) t_result = cur.fetchone() print(t_result) db1_cursor.close() print("-----------------------井盖监控点编码", t_result[5]) strElement0 = common.reverseStart_field() # 头 strElement1 = common.reverseNumber_field() # 序列 strElement2 = common.intChange16(t_result[4], 1) # 应答标志 # strElement3=common.changeascii(t_result[5],12)#设备编码 strElement3 = common.changeascii(t_result[5], 12) # 设备编码 logging.info("报文发送时间:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " 编码:" + t_result[5]) strElement4 = common.intChange16(t_result[6], 1) # 井盖状态 strElement5 = common.intChange16(t_result[7], 1) # 正在操作方式 strElement6 = common.intChange16(t_result[8], 3) # 告警状态 strElement6_1 = common.intChange16(t_result[9], 3) # 告警状态 strElement7 = common.intChange16(t_result[10], 1) # 控制授权 strElement8 = common.intChange16(t_result[11], 1) # 角度状态 strElement9 = common.zfill(t_result[12], 4) # 结束标记 # 拼接字符串 strElement = [strElement0, strElement1, strElement2, strElement3, strElement4, strElement5, strElement6, strElement6_1, strElement7, strElement8, strElement9] strResult = ''.join(strElement) # print("=====================") print("拼接报文:", strResult) return strResult except Exception as e: print("Error: %s" % e) if __name__ == "__main__": for id in range(1, 68): print(parseadd(1)) # db1_cursor = mysql.connector.connect(host='127.0.0.1', port='3306', user='root', password='123456', database='monitor', charset='utf8') # cur = db1_cursor.cursor() # case_id='1' # #parseadd(case_id) # db1_cursor.close()
run_test.py
# coding=utf-8 import datetime import logging import threading from socket import * from time import sleep import manholecover_state # import logging.handlers logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='test.log', filemode='w') logging.debug('debug message') logging.info('info message') logging.warning('warning message') logging.error('error message') logging.critical('critical message') socks = [] def singlesend(case, soc2): global lock lock.acquire() # case_id = 1 # while case_id<=2: #循环次数 print("报文发送时间:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) print("-----------------------井盖监控点编码", case) # , "上报次数:", case_id) send_manholecover_state = manholecover_state.parseadd(case) #print("----------------------", send_manholecover_state + ' ') soc2.send(bytes().fromhex(send_manholecover_state)) # case_id = case_id + 1 # print("报文发送时间:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # sleep(1) #可修改的上报时间间隔 lock.release() def message(): global lock lock.acquire() t_code = 201820190601 threads = [] # while (t_code <= 201820190667): for case in range(1, 68): t_singlesend = threading.Thread(target=singlesend, args=(str(case), socks[case-1])) threads.append(t_singlesend) # t_singlesend.setDaemon(True) # t_singlesend.start() # sleep(1) t_code += 1 for i in threads: i.setDaemon(True) i.start() lock.release() if __name__ == '__main__': lock = threading.Lock() end = datetime.datetime.now() + datetime.timedelta(days=2) # 获取结束时间 print("报文发送一周结束时间" + end.strftime("%Y-%m-%d %H:%M:%S") + ' ') for case in range(1, 68): soc = socket(AF_INET, SOCK_STREAM) soc.settimeout(300) # 设置超时时间 soc.connect(('192.168.0.156', 8845)) socks.append(soc) while True: message() now = datetime.datetime.now() sleep(1) if now >= end: break ''' threads = [] threads.append(t_singlesend) for t in threads: t.start() for t in threads: t.join() logging.info("全部执行完成~:%s" % ctime()) '''
运行结果:达到需求,每秒发送67条报文到平台
在这种正常场景下进行压测,连续跑了几个小时候,平台就崩掉了,java内存不断的升高;
开发进行不断的性能优化,已经连续跑了三四的情况下,平台已经走向稳定。