目的:
通过添加设备号,则自动给添加的设备分配端口,启动对应的appium服务。注意:为了方便,将共用一个配置文件。
1、公共的配置文件名称:desired_caps.yaml
platformVersion: 5.1.1 platformName: Android deviceName: oppo appPackage: com.iBer.iBerAppV2 appActivity: com.iBer.iBerAppV2.MainActivity #appPackage: com.android.mms #appActivity: /com.qiku.android.mms.ui.MmsConversationListActivity noReset: False unicodeKeyborad: True #使用Unicode编码方式发送字符串 resetKeyborad: True #隐藏键盘 ip: 127.0.0.1 #port: 4723 #devices_list: ["7f4bd69a","57614229"] #给出需要启动的设备udid,此处启动2个真机 #devices_list: ["57614229","127.0.0.1:62001"] devices_list: ["57614229"] phone: ["12606666333"]
2、自动根据添加的设备分配端口,前提:检查当前分配的端口是否被占用,若已被占用则自动删除此端口的进程,重新分配此端口。文件名称:Check_port.py
# -*- coding: utf-8 -*- ''' 意义:端口的自动检测 ''' import socket import os import re def check_port(host,port): '''检测端口是否可用''' s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) #socket.SOCK_STREAM 或 SOCK_DGRAM try: s.connect((host,port)) s.shutdown(2) #禁止在一个socket上进行数据的接收与发送,利用shutdown函数使socket双向数据传输变为单向数据传输, # shutdown需要一个单独的参数,该参数表示了如何关闭socket,0:表示禁止将来读,1:表示禁止将来写,2:表示禁止将来读写 #except OSError as msg: except: print("port %s is avaliable")%port #端口可用 return True else: print ("port %s alreadly be in use !!!")%port #端口已被占用 return False def release_port(port): print"release_port-------------杀掉已占用的端口" '''杀掉正在执行的端口''' cmd = "lsof -i:%s|awk 'NR==2{print $2}'" % port pid = os.popen(cmd).read() print "find port:%s, pid is value:%s" %(port,pid) cmd = "kill -9 %s" % pid print "kill port:%s,pid value:%s"%(port,pid) os.popen(cmd).read() # if __name__ =="__main__": # host = "127.0.0.1" # port = 4726 # check_port(host,port) # release_port(port)
3、检查appium服务是否开启,根据当前连接的设备数量,自动开启对应的appium服务,文件名称为:muti_appium.py
# -*- coding: utf-8 -*- ''' python启动多个appium服务----并发 ''' import subprocess from time import ctime import sys,os import yaml reload(sys) sys.setdefaultencoding('utf8') path = os.getcwd() print path with open(path+"/desired_caps.yaml","r") as file: data = yaml.load(file) devices_list = data["devices_list"] def appium_start(host,port): print "-----------------------------appium_start-----------------------------" '''启动appium服务''' #此处的port+3是为了appium服务的启动端口不跟multi_device中的端口重复 bootstrap_port = str(port+3) print "bootstrap_port" + str(bootstrap_port) '''# win start /b appium -a 127.0.0.1 -p 4723 --log xxx.log --local-timezone # mac appium -a 127.0.0.1 -p 4723 --log xxx.log --local-timezone &''' # cmd = "start /b appium -a"+host+" -p "+str(port)+" -bp "+str(bootstrap_port) windows上的写法 cmd = "appium -a " + host + " -p " + str(port) + " -bp " + str(bootstrap_port) print "%s at %s"%(cmd,ctime()) print path+'/appium_log/'+str(port)+'.log' logpath = os.getcwd()[:-5] print logpath subprocess.Popen(cmd,shell=True,stdout=open(logpath+'/appium_log/'+str(port)+'.log','wa'),stderr=subprocess.STDOUT) # if __name__ == "__main__": # host = "127.0.0.1" # port = 4723 # appium_start(host,port) # appium_process = [] lucky注销,释放开则是并发的启动appium服务 # for i in range(len(devices_list)): #根据连接Android设备的数量,可更改此处的值 host = "127.0.0.1" port = 4723+2+i print port appium_start(host, port) # appium = multiprocessing.Process(target=appium_start,args=(host,port)) # appium_process.append(appium) # if __name__ == "__main__": lucky注销 # # for appium in appium_process: # appium.start() # for appium in appium_process: # appium.join()
4、检查当前添加的设备号,自动开始执行多设备。文件名称为:muti_device.py
# -*- coding: utf-8 -*- from appium import webdriver from time import ctime import yaml import sys from Add_Case_Gather import Run_test import os,time from time import sleep #为了读取yaml中的中文,否则会报错 reload(sys) sys.setdefaultencoding('utf8') #获取desired_caps.yaml的存放路径 path = os.getcwd() with open(path+"/desired_caps.yaml","r") as file: data = yaml.load(file) devices_list = data["devices_list"] #开始执行设备 for i in range(len(devices_list)): port = 4723 + 2 + i def appium_desire(udid,port): print "--------------------appium_desire-------------------------" desired_caps = {} desired_caps["platformName"] = data["platformName"] desired_caps["platformVersion"] = data["platformVersion"] desired_caps["deviceName"] = data["deviceName"] desired_caps["udid"]=udid desired_caps["appPackage"] = data["appPackage"] desired_caps["appActivity"] = data["appActivity"] desired_caps["noReset"] = data["noReset"] print ("appium port:%s start run %s at %s" %(port,udid,ctime())) print str(data['ip'])+str(port) driver = webdriver.Remote('http://'+str(data['ip'])+':'+str(port)+'/wd/hub',desired_caps) print driver #调用需要执行的步骤操作方法 Run_test(driver).run_test1() return devices_list[i],port
5、将如上的文件共同在一个文件中进行启动调用,文件名称为:Run_Test.py
# -*- coding: utf-8 -*- '''并发的测试 主要功能: 1、检查给定的端口是否被占用,如果占用则自动释放 2、并发启动appium服务 3、并发启动device服务 ''' from Test.Common.multi_appium import appium_start #from multi_device import appium_desire from Test.Common.multi_device import appium_desire from Test.Common.Check_port import * from time import sleep import multiprocessing import sys import yaml #为了读取yaml中的中文,否则会报错 reload(sys) sys.setdefaultencoding('utf8') # 获取desired_caps.yam的路径 path = os.getcwd() with open(path+"/desired_caps.yaml","r") as file: data = yaml.load(file) devices_list = data["devices_list"] def start_appium_action(host,port): print "start_appium_action------------------" if check_port(host,port)==False: release_port(port) elif check_port(host,port)==True: appium_start(host, port) return True else: print("appium %s start faild!"%port) return False def start_devices_action(udid,port): host = "127.0.0.1" appium_desire(udid, port) def appium_start_sync(): appium_process = [] for i in range(len(devices_list)): host = "127.0.0.1" port = 4723+2+i appium = multiprocessing.Process(target=start_appium_action,args=(host,port)) appium_process.append(appium) for appium in appium_process: appium.start() for appium in appium_process: appium.join() sleep(5) def devices_start_sync(): desired_process = [] for i in range(len(devices_list)): port = 4723+2+i desired = multiprocessing.Process(target=start_devices_action, args=(devices_list[i], port)) desired_process.append(desired) for desired in desired_process: desired.start() for desired in desired_process: desired.join() if __name__ == "__main__": appium_start_sync() devices_start_sync()
如上已完成了多设备的添加,如果需要新增多个设备,则在desired_caps.yaml中的 devices_list: ["57614229"]中新增设备号,然后运行Run_Test.py即可。