zoukankan      html  css  js  c++  java
  • Appium+Python3 并发测试实例

    from selenium import webdriver
    import yaml
    from time import ctime
    
    with open('desired_caps.yaml','r') as f:
        data = yaml.load(f)
    
    desired_list=['127.0.0.1:62001','127.0.0.1:62025']
    def appium_devices(udid,port):
        desired_caps={}
        desired_caps['platformName']=data['platformName']
        desired_caps['platformVerion']=data['platformVersion']
        desired_caps['deviceName']=data['deviceName'] #没有实际作用
    
        desired_caps['udid']=udid
    
        desired_caps['app']=data['app']
        desired_caps['noReset']=data['noReset']
        desired_caps['appPackage']=data['appPackage']
        desired_caps['appActivity']=data['appActivity']
    
        print('appium port:%s start run %s at %s'%(port,udid,ctime()))
        driver=webdriver.Remote('http://'+str(data['ip'])+':'+str(port)+'/wd/hub',desired_caps)
        return driver
    
    if __name__ == '__main__':
        appium_devices(desired_list[0],4723)
        appium_devices(desired_list[1],4725)
    import subprocess
    from time import ctime
    
    def appium_start(host,port):
        bootstrap_port=str(port+1)
        cmd='start /b appium -a '+ host +' -p '+ str(port) +' -bp '+ str(bootstrap_port)
    
        print('%s at %s'%(cmd,ctime()))
        subprocess.Popen(cmd,shell=True,stdout=open('./appium_log/'+str(port)+'.log','a'),stderr=subprocess.STDOUT)
    
    if __name__ == '__main__':
        host='127.0.0.1'
        # port=4723
        # appium_start(host,port)
    
        #启动两个进程
        for i in range(2):
            port=4723+2*i
            appium_start(host,port)
    import socket
    import os
    
    def check_port(host,port):
        '''检测端口是否被占用'''
        #创建socket对象
        s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    
        try:
            s.connect((host,port))
            s.shutdown(2)
        except OSError as msg:
            print('port %s is available'%port)
            print(msg)
            return True
        else:
            print('port %s is already use'%port)
            return False
    
    
    def relase_port(port):
        '''释放指定端口'''
        #查找指定端口对应的pid
        cmd_find='netstat -aon | findstr %s'%port
        print(cmd_find)
    
        #返回命令执行后的结果
        result=os.popen(cmd_find).read()
        print(result)
    
        if str(port) and 'LISTENING' in result:
            #获取端口对应的pid进程
            i=result.index('LISTENING')
            start=i+len('LISTENING')+7
            end=result.index('
    ')
            pid=result[start,end]
    
            #关闭被占用端口的pid
            cmd_kill='taskkill -f -pid %s'%pid
            print(cmd_kill)
            os.popen(cmd_kill)
        else:
            print('port %s is available'%port)
    
    
    if __name__ == '__main__':
        host='127.0.0.1'
        port=4725
        # check_port(host,port)
        relase_port(port)

    实例:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time : 2019-08-07 12:03
    # @Author : zhouyang
    # @File : appium_devices_sync.py
    '''Appium并发测试
    并发启动两台appium服务,再并发启动2台设备测试考研帮APP
    2个appium服务,端口如下:
    Appium服务端口:4723 ,bp端口:4724
    Appium服务端口:4725 ,bp端口:4726
    2台设备:
    127.0.0.1:62001
    127.0.0.1:62025
    '''
    
    from appium_sync.mulit_devices import appium_devices
    from appium_sync.mulit_appium import appium_start
    from appium_sync.check_port import *
    import multiprocessing
    from time import sleep
    
    desired_list=['127.0.0.1:62001','127.0.0.1:62025']
    
    def start_appium_action(host,port):
        if check_port(host,port):
            appium_start(host,port)
            return True
        else:
            print('appium %s is start fail'%port)
            return False
    
    def start_device_action(udid,port):
        host='127.0.0.1'
        if start_appium_action(host,port):
            appium_devices(udid,port)
        else:
            relase_port(port)
    
    def appium_start_sync():
        print('=======appium_start_sync=======')
        appium_process = []
    
        for i in range(2):
            host = '127.0.0.1'
            port = 4723 + 2 * i
            appium = multiprocessing.Process(target=start_appium_action, args=(host, port))
            appium_process.append(appium)
    
        for appium in appium_process:
            appium.start()
        for appium in appium_process:
            appium.join()
    
        sleep(5)
    
    def devices_start_sync():
        print('======devices_start_sync=======')
    
        # 创建desired进程组
        desired_process = []
        # 加载进程
        for i in range(len(desired_process)):
            port = 4723 + 2 * i
            deired = multiprocessing.Process(target=start_device_action, args=(desired_process[i], port))
            desired_process.append(deired)
    
        # 开启进程
        for deired in desired_process:
            deired.start()
        # 关闭进程
        for deired in desired_process:
            deired.join()
    
        sleep(5)
    
    if __name__ == '__main__':
        appium_start_sync()
        devices_start_sync()
  • 相关阅读:
    angular-utils-pagination 使用案例
    解决mongodb ISODate相差8小时问题
    Android HttpClient POST JSON Restful-web-services
    将Map转换为Java 对象
    request.getParameterMap()
    angular-fullstack test
    oracle insert &字符插入问题
    listview底部增加按钮
    IDE 常用快捷键记录
    一个jQuery扩展工具包
  • 原文地址:https://www.cnblogs.com/xiuxiu123456/p/11322459.html
Copyright © 2011-2022 走看看