zoukankan      html  css  js  c++  java
  • Appium自动化如何控制多设备并行执行

     


    前言:

        如何做到,控制多设备并行执行测试用例呢。


    思路篇

      我们去想下,我们可以获取参数的信息,和设备的信息,那么​我们也可以针对每台设备开启不一样的端口服务。那么每个服务都对应的端口,我们在获取设备列表的时候,要和 每个服务对应起来,这样,我们开启一个进城池,我们在进程池里去控制设备,​每个进程池 控制不一样的设备即可。


    实现篇

      首先实现对应的参数篇和对应的设备端口,

        

    def startdevicesApp():
        l_devices_list=[]
        port_list=[]
        alldevices=get_devices()
        if len(alldevices)>0:
            for item in alldevices:
                port=random.randint(1000,6000)
                port_list.append(port)
                desired_caps = {
                        'platformName': 'Android',
                        'deviceName': item,
                        'platformVersion': getPlatForm(item),
                        'appPackage': get_apkname(apk_path),  # 包名
                        'appActivity': get_apk_lautc(apk_path),  # apk的launcherActivity
                        'skipServerInstallation': True,
                    "port":port
                    }
                l_devices_list.append(desired_caps)
        return  l_devices_list,port_list
    

        ​接下来,我们去​写一个端口开启服务。

    class RunServer(threading.Thread):#启动服务的线程
        def __init__(self, cmd):
            threading.Thread.__init__(self)
            self.cmd = cmd
        def run(self):
            os.system(self.cmd)
    def  start(port_list:list):
        def __run(url):
            time.sleep(10)
            response = urllib.request.urlopen(url, timeout=5)
            if str(response.getcode()).startswith("2"):
                return True
        for i in range(0, len(port_list)):
            cmd = "appium  -p %s  " % (
                port_list[i])
            if platform.system() == "Windows":  # windows下启动server
                t1 =RunServer(cmd)
                p = Process(target=t1.start())
                p.start()
                while True:
                    time.sleep(4)
                    if __run("http://127.0.0.1:" + port_list[i]+ "/wd/hub/status"):
                        break

    ​我们开启服务了,接下来,我们怎样根据​不同进程执行测试用例。

    def runcase(devics):
        #执行测试用例
        pass
    def run(deviceslist:list):
    ​
        pool = Pool(len(deviceslist))
        for i in deviceslist:
            pool.map(runcase, i)
        pool.close()
        pool.join()

      接下来,就是我们去组合形成最后的执行的代码。


        最终代码展示

    from appium import webdriver
    from androguard.core.bytecodes.apk import APK
    import os
    import random
    apk_path = "/Users/lileilei/Downloads/com.tencent.mobileqq_8.5.0_1596.apk"
    
    
    def get_devices() -> list:
        all_devices = []
        cmd = "adb devices"
        reslut = os.popen(cmd).readlines()[1:]
        for item in reslut:
            if item != "
    ":
                all_devices.append(str(item).split("	")[0])
        return all_devices
    
    
    def getPlatForm(dev: str) -> str:
        cmd = 'adb -s {} shell getprop ro.build.version.release'.format(dev)
        reslut = os.popen(cmd).readlines()[0]
        return str(reslut).split("
    ")[0]
    
    
    def get_apkname(apk):
        a = APK(apk, False, "r")
        return a.get_package()
    
    
    def get_apk_lautc(apk):
        a = APK(apk, False, "r")
        return a.get_main_activity()
    
    import  platform
    from multiprocessing import Process,Pool
    import time,urllib.request
    import threading
    class RunServer(threading.Thread):#启动服务的线程
        def __init__(self, cmd):
            threading.Thread.__init__(self)
            self.cmd = cmd
        def run(self):
            os.system(self.cmd)
    def  start(port_list:list):
        def __run(url):
            time.sleep(10)
            response = urllib.request.urlopen(url, timeout=5)
            if str(response.getcode()).startswith("2"):
                return True
        for i in range(0, len(port_list)):
            cmd = "appium  -p %s  " % (
                port_list[i])
            if platform.system() == "Windows":  # windows下启动server
                t1 =RunServer(cmd)
                p = Process(target=t1.start())
                p.start()
                while True:
                    time.sleep(4)
                    if __run("http://127.0.0.1:" + port_list[i]+ "/wd/hub/status"):
                        break
    
    def startdevicesApp():
        l_devices_list=[]
        port_list=[]
        alldevices=get_devices()
        if len(alldevices)>0:
            for item in alldevices:
                port=random.randint(1000,6000)
                port_list.append(port)
                desired_caps = {
                        'platformName': 'Android',
                        'deviceName': item,
                        'platformVersion': getPlatForm(item),
                        'appPackage': get_apkname(apk_path),  # 包名
                        'appActivity': get_apk_lautc(apk_path),  # apk的launcherActivity
                        'skipServerInstallation': True,
                    "port":port
                    }
                l_devices_list.append(desired_caps)
        return  l_devices_list,port_list
    def runcase(devics):
        #执行测试用例
        pass
    def run(deviceslist:list):
    
        pool = Pool(len(deviceslist))
        for devices in deviceslist:
            pool.map(runcase, devices)
        pool.close()
        pool.join()
    if  __name__=="__main__":
        l_devices_list,port_list=startdevicesApp()
        start(port_list)
        run(l_devices_list)

        文章首发在雷子说测试开发公众号

          

      

  • 相关阅读:
    sqlserver监控体系
    使SQL用户只能看到自己拥有权限的库
    存储过程版本控制-DDL触发器
    查看剩余执行时间
    迁移数据库文件位置
    sublime使用Package Control不能正常使用的解决办法
    未在本地计算机上注册“Microsoft.Jet.OLEDB.4.0”提供程序的处理方式
    1770Special Experiment
    1848Tree
    1322Chocolate
  • 原文地址:https://www.cnblogs.com/leiziv5/p/14225882.html
Copyright © 2011-2022 走看看