zoukankan      html  css  js  c++  java
  • Python开发测试工具案例分享⑦——老化测试实现代码

    Main_LH.py
    # -*- coding: utf-8 -*-
    from PyQt4 import QtCore
    import os,time,socket,datetime,codecs
    import thread
    import paramiko
    import ConfigParser
    import sys
    import gc
    reload(sys)
    sys.setdefaultencoding('utf8')
     
     
    #ssh登录
    class ssh():
        def __init__(self,host,username,passwd):
            self.s = paramiko.SSHClient()
            self.s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            self.s.connect(hostname=host,port=22, username=username, password=passwd,timeout = 30)
            self.ssh = self.s.invoke_shell()
            time.sleep(2)
     
        def Send(self,Command):
            self.ssh.send(str(Command) + '
    ')
            time.sleep(1)
     
        def Recv(self,Buff_Size,click_time):
            try:
                buff = self.ssh.recv(Buff_Size,click_time)
            except:
                self.Send("
    ")
                buff = self.ssh.recv(Buff_Size, click_time)
            return buff
     
        def Close(self):
            self.s.close()
     
    #获取配置文件信息
    class Config_ini():
        def __init__(self,filename):
            self.config = ConfigParser.ConfigParser()       #创建配置文件
            self.config.readfp(open(filename))      #打开并读取配置文件
        def get_info(self,session,key):
            return self.config.get(session,key)     #读取配置文件config中指定段的键值
        def session(self):
            return self.config.sections()           #获取所有的段
        def option(self,session):
            return self.config.options(session)     #得到指定段的所有信息
        def set(self,session,option,value):
            return self.config.set(session,option,value)        #修改配置文件的值
     
    #log信息保存
    class Consumer(QtCore.QThread):
        def __init__(self,queue,parent = None):
            super(Consumer,self).__init__(parent)
            self.data = queue
     
        def write_log(self,PD,SN,Time):
            if not os.path.exists(os.getcwd() + "\log"):
                os.makedirs(os.getcwd() + "\log")
            Path = os.getcwd() + "\log\"
            self.data_file = Path + PD + "_aging_" + SN + "_" + Time + ".log"
            while True:
                s = self.data.get()
                F = codecs.open(self.data_file, "a+",encoding='gb18030')
                F.write(s)
                F.close()
     
    #主测试程序
    class Main_Test(QtCore.QThread):
        def __init__(self,queue,parent = None):
            super(Main_Test,self).__init__(parent)
            self.data = queue
            self.isWait = True
            self.working = True
            self.Red = "QPushButton{background-color:RED}"
            self.Yellow = "QPushButton{background-color:YELLOW}"
            self.Green = "QPushButton{background-color:GREEN}"
            self.Config_ini = Config_ini("ini/Paramiters.ini")
            #self.GNS_Server_IP = self.Config_ini.get_info("GNS", "Server_IP")
            self.GNS_aging_time = int(self.Config_ini.get_info("GNS", "aging_time"))
     
        # 获取本地ip
        def Local_IP(self):
            Local_IP = socket.gethostbyname(socket.gethostname())
            return Local_IP
     
        #连接设备
        def Connection(self,host):
            username = "gns"
            passwd = "feitian"
            try:
                self.Connect = ssh(str(host),username,passwd)
                self.Send_Command("
    ")
            except Exception,e:
                self.Test_Fail(str(e))
     
        #发送接收数据
        def Send_Command(self,Command,Prompt='#',Timeout=10,click_time=1):
            try:
                buff = ""
                log = ""
                self.Connect.Send(Command)
                time.sleep(1)
                runtime = datetime.datetime.now()
                while Prompt not in buff:
                    buff = ""
                    time.sleep(3)
                    buff += self.Connect.Recv(999999,click_time)
                    log += buff
                    self.data.put(buff)
                    steptime = datetime.datetime.now()
                    if (steptime - runtime).seconds > Timeout:
                        self.Test_Fail(u"超时, %s 不能找到" % Prompt)
                        break
                return log
            except Exception:
                self.Test_Fail(u"%s不能找到" % Prompt)
     
        #记录结果
        def Save_Result(self,SN,Status,Time):
            try:
                if not os.path.exists(os.getcwd() + "\Record"):
                    os.makedirs(os.getcwd() + "\Record")
                Path = os.getcwd() + "\Record\"
                F = open(Path + 'Upgrade.log', "a+")
                F.seek(0,0)
                F.write("
    " + self.PD + " " + str(SN) + " " + Status + " , Test time : " + str(Time))
                F.close()
            except Exception, e:
                return str(e)
     
        #通过开头和结尾字符串获取中心字符串
        def GetMiddleStr(self,content,startStr,endStr):
            try:
                startIndex = content.index(startStr)
                if startIndex >= 0:
                    startIndex += len(startStr)
                endIndex = content.index(endStr)
                return content[startIndex:endIndex].strip()
            except Exception,e:
                self.Test_Fail(str(e))
     
        #设置颜色
        def Set_Color(self,message):
            self.emit(QtCore.SIGNAL('color'),message)
     
        #设置状态
        def Set_Status(self,message):
            self.emit(QtCore.SIGNAL('status'),message)
     
        #测试通过提示
        def Test_Pass(self,message):
            self.working = False
            self.Save_Result(self.SN," Pass",self.Time)         #保存通过结果
            self.data.put(message)                              #入队列
            self.Set_Color(self.Green)                          #测试通过显示为绿色
            self.Set_Status(message)                            #按钮显示状态
     
        #测试失败提示
        def Test_Fail(self,message):
            self.working = False
            self.error_count = self.error_count + 1
            self.Save_Result(self.SN, " Fail", self.Time)       #保存失败结果
            self.Set_Color(self.Red)                            #测试失败显示红色
            Faillentime = datetime.datetime.now()
            second = (Faillentime - self.starttime).total_seconds()
            Time = u"测试时间:" + self.Running_time(second)     #获得老化时间
            self.Set_Status(message + "
    " + Time)              #按钮显示状态
            self.data.put(message + "  " + Time)                #入队列
            thread.exit_thread()                                #线程结束
     
        #测试开始提示
        def Test_Running(self,message):
            self.Set_Color(self.Yellow)         #显示黄色
            self.Set_Status(message)
     
        #运行时间
        def Running_time(self,second):
            h = int(second / 3600)
            m = int((second - h * 3600) / 60)
            s = int(second - h * 3600 - m * 60)
            test_time = str(h) + "h:" + str(m) + "m:" + str(s) + "s"
            return test_time
     
        #程序运行
        def Script_Start(self,Value,PD,SN,Host,Time,Remote_IP = None):
            self.working = True
            self.PD = PD
            self.SN = SN
            self.Host = Host
            self.Value =Value
            self.Time=Time
            self.error_count = 0
            self.starttime = datetime.datetime.now()        #获取本地时间
            self.endtime = self.starttime + datetime.timedelta(hours=self.GNS_aging_time)        #设置结束时间
            self.Test_Running(u'登录设备')           #登录信号
            self.Connection(self.Host)              #连接设备
            time.sleep(5)
            if self.Value == 0 :
                self.Rsync()                        #同步数据
                if self.error_count == 0:
                    self.Test_Pass(u"同步成功")
            else:
                self.BI(Remote_IP)                  #老化测试
     
        '''
        #同步数据
        def Rsync(self):
            self.Test_Running(u"同步数据")
            rusername = 'safi'
            Rsync_IP = self.GNS_Server_IP
            self.Send_Command("cd /opt")
            self.Send_Command("touch feitian ")
            self.Send_Command('echo "feitian"  > feitian ')
            self.Send_Command('chmod 600 feitian ')
            log = self.Send_Command("ping " + Rsync_IP + " -c5" , "#" , 20)
            if "100% packet loss" in log:
                self.Test_Fail(u"服务器不能Ping通")
            else:
                log = self.Send_Command("rsync -auvzP --delete --password-file=feitian " + rusername + '@' + Rsync_IP + "::GNS /opt&","total size",72000)
                if "rsync error" in log:
                    self.Test_Fail(u"rsync 同步失败")
                else :
                    self.Send_Command('chmod 777 Local_deploy_gns.sh', "#", 60)
                    self.Send_Command('./Local_deploy_gns.sh install',"#",60)
        '''
     
        #老化测试
        def BI(self,Remote_IP):
            iperf = os.popen("tasklist|find /c "iperf.exe"")  #执行统计iperf进程命令
            iperf_number = iperf.read()                         #读取iperf进程数量
            if int(iperf_number) < 1:
                os.system("start /b iperf.exe -s -w1m&")        #后台运行iperf
            os.system('netsh firewall set opmode disable')      #关闭windows系统防火墙
            os.system(
                'reg add HKEY_LOCAL_MACHINESYSTEMCurrentControlSetServicesW32TimeTimeProvidersNtpServer /v Enabled /t REG_DWORD /d 1 /f')  # 修改注册表
            os.system(
                'reg add HKEY_LOCAL_MACHINESYSTEMCurrentControlSetServicesW32TimeConfig /v AnnounceFlags /t REG_DWORD /d 5 /f')  # 修改注册表
            os.system("net stop w32time & net start w32time")   #停止和启动windows系统NTP服务
            self.Test_Running(U"老化测试")           #开始老化信号
            self.Check_Process()                    #测试过程提升CPU利用率
            self.VersionCheck()                     #版本检测
            self.MAC_check()                        #MAC地址检测
            while self.working == True:
                time.sleep(5)
                self.ShowTemperature()              #温度检测
                self.Clock_Test()                   #时钟检测
                self.MemeryCheck()                  #内存检测
                #self.Discrete()                     #离散量检测
                if Remote_IP != None:
                    self.EthSpeedCheck(Remote_IP)   #网口速率检测
                self.SSDCheck()                     #硬盘检测
                time.sleep(20)
                self.Clean_Caches()                 #清除内存
                Finish_time = datetime.datetime.now()
                if (Finish_time - self.endtime).days >= 0:
                    self.Test_Pass(u"老化测试成功")
                    break
                gc.collect()
     
        #清除内存
        def Clean_Caches(self):
            self.Test_Running(u"清除内存")
            self.Send_Command("echo 3 >> /proc/sys/vm/drop_caches&")
            time.sleep(3)
            self.Send_Command("free -m")
            self.Send_Command('cd /')
     
        #测试过程提升CPU利用率
        def Check_Process(self):
            if int(self.GetMiddleStr(self.Send_Command("ps -A | grep -c kcpu"), "kcpu", "[")) < 2:
                self.Send_Command("cd /sbin")
                self.Send_Command("./run_kcpu.sh start 80")
                self.Send_Command("cd /")
     
        #MAC地址检测
        def MAC_check(self):
            self.Test_Running(u"MAC地址检测")
            MAC = self.Send_Command("ifconfig | grep eth0")
            if "5C:E0:CA" not in MAC:
                self.Test_Fail(u"MAC地址格式错误")
     
        #时钟检测
        def Clock_Test(self):
            self.Test_Running(u"NTP检测")
            self.Send_Command("service ntpd stop")
            self.Send_Command("ntpdate %s &" % self.Local_IP())
            time.sleep(10)
            date = self.Send_Command("date")            #获取设备时间
            clock = str(datetime.datetime.now().year)   #获取本地时间
            if clock not in date:
                self.Test_Fail(u"时钟错误")
     
        #网口速率测试
        def EthSpeedCheck(self,Remote_IP):
            self.Test_Running(u"网口速率检测")
            Ping_message = self.Send_Command("ping %s -c5" % Remote_IP)
            if "Host Unreachable" in Ping_message:
                self.Test_Fail(u"Iperf 服务器地址不能Ping通")
            else:
                Eth_Speed = self.Send_Command("iperf -c %s -w1m -i1 -t30 | grep '0.0-3'" % Remote_IP, "#", 60,10)
                if "Broken pipe" in Eth_Speed:
                    self.Test_Fail(u"Iperf 服务器不通")
                self.Send_Command("del_time")
     
        #温度检测
        def ShowTemperature(self):
            self.Test_Running(u"温度检测")
            temp = self.Send_Command("sensors -u &","Done",20)
            Core0 = self.GetMiddleStr(str(temp), 'temp2_input:', "temp2_max:")
            if float(Core0) > 96:
                self.Test_Fail(u"CPU Core 0 温度超过96℃,当前温度:%s℃" % Core0)
            Core1 = self.GetMiddleStr(str(temp), 'temp3_input:', "temp3_max:")
            if float(Core1) > 96:
                self.Test_Fail(u"CPU Core 1 温度超过96℃,当前温度:%s℃" % Core0)
     
        #内存检测
        def MemeryCheck(self):
            self.Test_Running(u"内存检测")
            self.Send_Command("iostat -m")
            mem = self.Send_Command("free | grep Mem | awk '{print $2}'", "#", 30)
            mem = self.GetMiddleStr(str(mem), "}'", "[").strip()
            if float(mem) < 8126000:
                self.Test_Fail(u"内存 <8G")
     
        #版本检测
        def VersionCheck(self):
            self.Test_Running(u"版本检测")
            self.Send_Command("vershow | grep Config | awk '{print $2}'",'#',10)
            self.Send_Command('fpga_version&',"Done",30)
            self.Send_Command('swv_read&',"Done",30)
            self.Send_Command('spn_read&',"Done",30)
            self.Send_Command('dpn_read&',"Done", 30)
            self.Send_Command('board_pn_read&',"Done", 30)
            self.Send_Command('mod_read&',"Done",30)
            self.Send_Command('uname -a&',"Done",30)
            self.Send_Command('dsn_read&',"Done",30)
            self.Send_Command('board_sn_read&',"Done",30)
            self.Send_Command('fpgapn_read&', "Done", 30)
            self.Send_Command('isopn_read&', "Done", 30)
            s = self.Send_Command("vershow&","Done",60)
            if "failed" in s:
                self.Test_Fail(u"版本未完全升级")
     
        #硬盘检测
        def SSDCheck(self):
            self.Test_Running(u"硬盘检测")
            if 'SATA' not in self.Send_Command('lspci'):
                self.Test_Fail(u"硬盘模块识别失败")
            num1 = self.Send_Command("fdisk -l | grep -c sda")
            num2 = self.Send_Command("df | grep -c sda")
            if "7" in num1 and "4" in num2:
                GNS_DISK = self.Send_Command('fdisk -l | grep sda')
                sda_size = self.GetMiddleStr(GNS_DISK, '/dev/sda:', "GB")
                if float(sda_size) < 950:
                    self.Test_Fail(u"第一硬盘容量 <950GB")
                if self.PD == 'GNS':
                    GNS_DISK = self.Send_Command('fdisk -l | grep sdb')
                    if '/dev/sdb' in GNS_DISK:
                        sdb_size = self.GetMiddleStr(str(GNS_DISK), 'Disk /dev/sdb: ', "GB,")
                        if float(sdb_size) < 950:
                            self.Test_Fail(u"第二硬盘容量 <950GB")
                    else:
                        self.Test_Fail(u"未识别到第二硬盘")
                if int(self.GetMiddleStr(self.Send_Command("ps -A | grep -c dd"), "dd", "[")) < 2:
                    self.Send_Command('dd bs=16M count=1024 if=/dev/zero of=test conv=fdatasync &')
                    time.sleep(400)
                    self.Send_Command('dd bs=16M if=test of=/dev/null &')
                    time.sleep(400)
                self.Send_Command("rm -rf test &")
                time.sleep(30)
            else:
                self.Test_Fail(u"硬盘分区未识别")
            time.sleep(30)
     
        #离散量测试
        def Discrete(self):
            self.Test_Running(u"离散量检测")
            self.Send_Command("arinc set_control_off")
            for i in range(1,17):
                if i<15:
                    self.Send_Command("hi8435_cfg wrDiscOut %s low" % str(i))
                else:
                    self.Send_Command("hi8435_cfg wrDiscOut %s low" % str(i))
            low = self.Send_Command("arinc  get_signalstatusmatrix", '#', 5)
            low=self.GetMiddleStr(low,'get_signalstatusmatrix','[')
            low = int(low[3] + low[7] + low[11] + low[15] + low[19] + low[23] + low[27] + low[31] + low[35] + low[39] + low[44] +low[49] + low[54])
            for i in range(1, 17):
                if i < 15:
                    self.Send_Command("hi8435_cfg wrDiscOut %s high" % str(i))
                else:
                    self.Send_Command("hi8435_cfg wrDiscOut %s high" % str(i))
            high = self.Send_Command("arinc  get_signalstatusmatrix", '#', 5)
            high = self.GetMiddleStr(high,'get_signalstatusmatrix','[')
            high = int(high[3] + high[7] + high[11] + high[15] + high[19] + high[23] + high[27] + high[31] + high[35] + high[39] +high[44] + high[49] + high[54])
            log1=low+high
            for i in range(1, 17):
                if i < 15:
                    self.Send_Command("hi8435_cfg wrDiscOut %s low" % str(i))
                else:
                    self.Send_Command("hi8435_cfg wrDiscOut %s low" % str(i))
            low = self.Send_Command("arinc  get_signalstatusmatrix", '#', 5)
            low = self.GetMiddleStr(low, 'get_signalstatusmatrix', '[')
            low = int(low[3] + low[7] + low[11] + low[15] + low[19] + low[23] + low[27] + low[31] + low[35] + low[39] + low[44] +low[49] + low[54])
            log2=high+low
            if log1!=1111111111111 and log2!=1111111111111:
                self.Test_Fail('Discrete Check FAIL')
     
            # 429检测
            self.Test_Running(u"429检测")
            self.Send_Command("hi3593_c0_cfg setRx1BitRate  high
    ")
            self.Send_Command("hi3593_c0_cfg setRx2BitRate  high
    ")
            self.Send_Command("hi3593_c0_cfg setTxBitRate high
    ")
            self.Send_Command("hi3593_c1_cfg setRx1BitRate  high
    ")
            self.Send_Command("hi3593_c1_cfg setRx2BitRate  high
    ")
            self.Send_Command("hi3593_c1_cfg setTxBitRate high
    ")
     
            if int(self.GetMiddleStr(self.Send_Command("ps | grep -c netlink_u_self.APP"), "self.APP", "[")) < 1:
                self.Send_Command("netlink_u_app &
    ", '#', 3)
            c = self.Send_Command("hi429_sendmsg_user_chip0 123 3
    ")
            if "0x42910001" not in c:
                self.Test_Fail(u"429 CHIP0 测试失败")
     
            d = self.Send_Command("hi429_sendmsg_user_chip1 456 3
    ")
            if "0x42920001" not in d:
                self.Test_Fail(u"429 CHIP1 测试失败")
    

      

    —————————————————————————————— 选择正确的事、再把事做正确 ——————————————————————————————
  • 相关阅读:
    内存-程序运行的空间
    数据在内存中是这样存储的(二进制形式存储)
    从编写源代码到程序在内存中运行的全过程解析
    QT开发工具
    Linux中Too many open files 问题分析和解决
    TCP端口状态说明ESTABLISHED、TIME_WAIT
    HttpClient当HTTP连接的时候出现大量CLOSE_WAIT连接
    缓存穿透、击穿、雪崩
    Http长连接和Keep-Alive以及Tcp的Keepalive
    防止表单重复提交
  • 原文地址:https://www.cnblogs.com/airb/p/13097512.html
Copyright © 2011-2022 走看看