zoukankan      html  css  js  c++  java
  • TCP/IP协议,TCP与平台通信,通讯协议压力测试(python)

    最近的项目来了一个需求,要求测试tcp网关通讯协议;

    1、液压井盖通过TCP/IP TCP与平台通信;

    2、硬件定期发送心跳包(10S)给平台,是平台与硬件保持长连接;

    3、每台硬件有一个12字节的唯一编码(字符型);

    4、每台设备是1S发送一条报文;

    最初使用NetAssist测试功能,模拟硬件设备发送报文,测试硬件设备发过来的状态。

    功能测试通过后,新来的压测需求:要求对模拟60个左右的设备每隔一秒发送一条报文到平台,去百度Google搜索TCP压测怎么压测,这类文章博客比较少,试了有个博客自己写的一个软件,发现不能满足需求(都不能安装),其余就是jmeter进行压测,所以就尝试用jmeter进行测试,版本是3.2,按照博客所写进行操作,发现还是不能满足需求,并不能进行60个设备同时1S发送一条报文(加入定时器也不能满足),单个的模拟一个设备可以发送成功。

    由于使用工具不能满足需求,只能依靠手写代码来实现了。

    common.py     存放公用方法,如进制转化、异或运算等;

    manholecover_state.py     结合数据组装报文;

    run_test.py     主方法,发送报文,执行脚本;

    test.log      日志文件

    common.py  

    # coding=utf8
    # import mysql.connector
    import binascii
    
    
    # 报文头
    def reverseStart_field():
        return "F5F5"
    
    
    # 返回序列
    def reverseNumber_field():
        return "0001"
    
    
    # 补全字节
    def zfill(str1, i):
        str1 = str1.zfill(2 * i)  # 补全 字节
        return str1
    
    
    # int转16进制
    def intChange16(str1, i):
        str1 = hex(int(str1))  # 转16进制
        str1 = str1.replace("0x", "")  # 去掉16进制0x
        str1 = str1.zfill(2 * i)  # 补全 字节
        return str1
    
    
    # 16进制 去掉0x 补全 字节
    def strReplace(str1, i):
        str1 = str1.replace("0x", "")  # 去掉16进制0x
        str1 = str1.zfill(2 * i)  # 补全 字节
        return str1
    
    
    # int转16进制 倒序
    def change16Reverse(str1, i):
        # print("==========调用方法change16Reverse")
        str1 = hex(int(str1))  # 转16进制
        str1 = str1.replace("0x", "")  # 去掉16进制0x
        str1 = str1.zfill(2 * i)  # 补全 字节
        # print("==========倒序之前:",str1)
        i = len(str1)
        str2 = ""
        while (i > 0):
            str2 = str2 + (str1[i - 2:i])
            i = i - 2
            # print("==========倒序之后:",str2)
        return str2
    
    
    # 时间转16进制
    def timeChange16(str1):
        arrTime = []
        arrTime = str1.split('-')
        arrChange = []
        strChange = ""
        i = 0
        while (i < len(arrTime)):
            if (i == 0):
                print("----------")
                strChange = change16Reverse(arrTime[i], 2)
            else:
                strTem2 = intChange16(arrTime[i], 1)
                strChange = strChange + strTem2
            i = i + 1
        return strChange + "FF"
    
    
    # 字符串 转换
    def changestr(str2):
        length = len(str2)
        utf8_length = len(str2.encode('utf-8'))  # gbk  8.5    utf-8  9
        length = int((utf8_length - length) / 2 + length)
        str2 = binascii.b2a_hex(str2.encode("gbk"))  # encode("gbk") utf8
        str2 = str(str2)  # 将 byte 类型转换成 str 类型
        str2 = str2.replace("b'", "").replace("'", "")  # 去掉 b'
        str2 = str2.zfill(2 * length)
        return str2
    
    
    # asc 转换
    
    def changeascii(str1, aa):
        e = 0  # 暂存结果
        for i in str1:
            d = ord(i)  # 单个字符转换成ASCii码
            e = e * 256 + d  # 将单个字符转换成的ASCii码相连
        a = hex(e)
        a = a.replace("0x", "")
        a = a.zfill(2 * aa)
        # print("结果是:%s" % a)
        return a
    
    
    if __name__ == "__main__":
        message_id = '0x10'
        case_id = "1"

    manholecover_state.py

    # -*-coding:utf-8-*-
    
    # coding=utf8
    import mysql.connector
    import common
    import logging
    import datetime
    import pymysql
    
    
    def parseadd(case):
        config = {
            'host': '127.0.0.1',  # 连接的IP地址
            'user': 'root',
            'password': '123456',
            'port': 3306,
            'database': 'monitor',
            'charset': 'utf8',  # 编码格式,防止查出来的数据中文乱码
        }
        #db1_cursor = mysql.connector.Connect(**config)  # 连接数据库
        db1_cursor = pymysql.Connect(**config)
    
        cur = db1_cursor.cursor()  # 执行命令,接收结果
        t_str = "select * from monitor.test_manholecover_state_copy where case_id="
        try:
            cur.execute(t_str + str(case))
            t_result = cur.fetchone()
            print(t_result)
            db1_cursor.close()
            print("-----------------------井盖监控点编码", t_result[5])
            strElement0 = common.reverseStart_field()  #
            strElement1 = common.reverseNumber_field()  # 序列
            strElement2 = common.intChange16(t_result[4], 1)  # 应答标志
            # strElement3=common.changeascii(t_result[5],12)#设备编码
            strElement3 = common.changeascii(t_result[5], 12)  # 设备编码
            logging.info("报文发送时间:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " 编码:" + t_result[5])
            strElement4 = common.intChange16(t_result[6], 1)  # 井盖状态
            strElement5 = common.intChange16(t_result[7], 1)  # 正在操作方式
            strElement6 = common.intChange16(t_result[8], 3)  # 告警状态
            strElement6_1 = common.intChange16(t_result[9], 3) # 告警状态
            strElement7 = common.intChange16(t_result[10], 1)  # 控制授权
            strElement8 = common.intChange16(t_result[11], 1)  # 角度状态
            strElement9 = common.zfill(t_result[12], 4)  # 结束标记
    
            # 拼接字符串
            strElement = [strElement0, strElement1, strElement2, strElement3, strElement4, strElement5, strElement6,
                          strElement6_1, strElement7, strElement8, strElement9]
            strResult = ''.join(strElement)
    
            # print("=====================")
            print("拼接报文:", strResult)
            return strResult
    
        except Exception as e:
            print("Error: %s" % e)
    
    
    if __name__ == "__main__":
    
        for id in range(1, 68):
            print(parseadd(1))
    
            # db1_cursor = mysql.connector.connect(host='127.0.0.1', port='3306', user='root', password='123456', database='monitor', charset='utf8')
            # cur = db1_cursor.cursor()
            # case_id='1'
            # #parseadd(case_id)
            # db1_cursor.close()

    run_test.py

    # coding=utf-8
    import datetime
    import logging
    import threading
    from socket import *
    from time import sleep
    import manholecover_state
    
    # import logging.handlers
    
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S',
                        filename='test.log',
                        filemode='w')
    logging.debug('debug message')
    logging.info('info message')
    logging.warning('warning message')
    logging.error('error message')
    logging.critical('critical message')
    socks = []
    
    
    def singlesend(case, soc2):
        global lock
        lock.acquire()
        # case_id = 1
        # while case_id<=2:   #循环次数
        print("报文发送时间:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        print("-----------------------井盖监控点编码", case)  # , "上报次数:", case_id)
        send_manholecover_state = manholecover_state.parseadd(case)
        #print("----------------------", send_manholecover_state + '
    ')
    
        soc2.send(bytes().fromhex(send_manholecover_state))
    
        # case_id = case_id + 1
        # print("报文发送时间:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        # sleep(1) #可修改的上报时间间隔
        lock.release()
    
    
    def message():
        global lock
        lock.acquire()
        t_code = 201820190601
        threads = []
        # while (t_code <= 201820190667):
        for case in range(1, 68):
            t_singlesend = threading.Thread(target=singlesend, args=(str(case), socks[case-1]))
            threads.append(t_singlesend)
            # t_singlesend.setDaemon(True)
            # t_singlesend.start()
            # sleep(1)
            t_code += 1
    
        for i in threads:
            i.setDaemon(True)
            i.start()
        lock.release()
    
    
    if __name__ == '__main__':
        lock = threading.Lock()
    
        end = datetime.datetime.now() + datetime.timedelta(days=2)  # 获取结束时间
        print("报文发送一周结束时间" + end.strftime("%Y-%m-%d %H:%M:%S") + '
    ')
        for case in range(1, 68):
            soc = socket(AF_INET, SOCK_STREAM)
            soc.settimeout(300)  # 设置超时时间
            soc.connect(('192.168.0.156', 8845))
            socks.append(soc)
    
        while True:
            message()
            now = datetime.datetime.now()
            sleep(1)
            if now >= end:
                break
    
    '''
    threads = []
    threads.append(t_singlesend)
    for t in threads:
        t.start()
    for t in threads:
        t.join()
        logging.info("全部执行完成~:%s" % ctime())
     '''

    运行结果:达到需求,每秒发送67条报文到平台

    在这种正常场景下进行压测,连续跑了几个小时候,平台就崩掉了,java内存不断的升高;

    开发进行不断的性能优化,已经连续跑了三四的情况下,平台已经走向稳定。

  • 相关阅读:
    POJ 1328 Radar Installation
    POJ 1700 Crossing River
    POJ 1700 Crossing River
    poj 3253 Fence Repair (贪心,优先队列)
    poj 3253 Fence Repair (贪心,优先队列)
    poj 3069 Saruman's Army(贪心)
    poj 3069 Saruman's Army(贪心)
    Redis 笔记与总结2 String 类型和 Hash 类型
    数据分析方法有哪些_数据分析方法
    数据分析方法有哪些_数据分析方法
  • 原文地址:https://www.cnblogs.com/hemingwei/p/11150917.html
Copyright © 2011-2022 走看看