zoukankan      html  css  js  c++  java
  • web端自动化——python多线程

    Python通过两个标准库thread和threading提供对线程的支持。thread提供了低级别的、原始的线程以及一个简单的锁。threading基于Java的线程模型设计。

    锁(Lock)条件变量(Condition)在Java中是对象的基本行为(每一个对象都自带了锁和条件变量),而在 Python中则是独立的对象。

    注意:我们应该避免使用thread模块,原因是thread模块不支持守护线程。当主线程退出时,所有的子线程不管它们是否还在工作,都会被强行退出。

    threading模块支持守护线程。

    1、进程与线程的区别:

    2、multiprocessing多进程模块

    多进程multiprocessing模块的使用与多线程threading模块的用法类似。multiprocessing提供了本地和远程的并发性,有效地通过全局解释锁(Global InterceptorLock,GIL)来使用进程(而不是线程)。由于GIL的存在,在CPU密集型的程序当中,使用多线程并不能有效地利用多核CPU的优势,因为一个解释器在同一时刻只会有一个线程在执行。所以,multiprocessing模块可以充分利用硬件的多处理器来进行工作。它支持UNlX和Windows系统上的运行。

    3、threading 多线程模块

    threading模块支持守护线程。

    4、Pipe和Queue

    multiprocessing提供threading包中没有的IPC(进程间通信),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式(因为它们根据的不是用户进程的资源)。

    multiprocessing包中有Pipe类和Queue类来分别支持这两种IPC机制。Pipe和Queue 可以用来传送常见的对象。

    ①    Pip可以是单向化(half-duplex),也可以是双向(duplex)。我们通过multiprocessing.Pipe (duplex=False)创建单向管道(默认为双向)。一个进程从pipe—端输入对象,然后被pipe 另一端的进程接收。单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。

    Pipe.py

    import multiprocessing

    def proc1(pipe):

        pipe.send('hello')

        print('proc1 rec:', pipe.recv())

    def proc2(pipe):

        print('proc2 rec:', pipe.recv())

        pipe.send('hello, too')

    if __name__ == "__main__":

        multiprocessing.freeze_support()

        pipe = multiprocessing.Pipe()

        p1 = multiprocessing.Process(target=proc1,args=(pipe[0],))

        p2 = multiprocessing.Process(target=proc2,args=(pipe[1],))

        p1.start()

        p2.start()

        p1.join()

        p2.join()

    这里的pipe是双向的。pipe对象建立的时候,返回一个含有两个元素的表,每个元素 代表pipe的一端(Connection对象)。我们对pipe的某一端调用send()方法来传送对象,在另一端使用recv()来接收。

    ②     Queue类与Pipe相类似,都是先进先出结构。但Queue类允许多个进程放入,多个进程从队列取出对象。Queue类使用Queue (maxsize)创建,maxsize表示队列中可以存放对象的最大数量。

    queue.py

    import multiprocessing

    import os, time

    def inputQ(queue):

        #getpid是获得当前进程的进程号。系统每开辟一个新进程就会为他分配一个进程号。

        info = str(os.getpid()) + '(put):'+str(time.time())

        #put方法用以插入数据到队列中

        queue.put(info)

    def outputQ(queue, lock):

        #get方法可以从队列读取并且删除一个元素

        info = queue.get()

        lock.acquire()

        print((str(os.getpid()) + '(get):'+info))

        lock.release()

    if __name__=='__main__':

        record1 =[]

        record2 =[]

        lock = multiprocessing.Lock()#加锁,为防止散乱的打印

        queue = multiprocessing.Queue(3)

        for i in range(10):

            process = multiprocessing.Process(target=inputQ,args=(queue,))

            process.start()

            record1.append(process)

        for i in range(10):

            process = multiprocessing.Process(target=outputQ,args=(queue,lock))

            process.start()

            record2.append(process)

        for p in record1:

            p.join()

        for p in record2:

            p.join()

        queue.close() #没有更多的对象进来,关闭queue

    5、多线程分布式执行测试用例

    Selenium Grid只是提供多系统、多浏览器的执行环境,Selenium Grid本身并不提供并行的执行测试用例,这个我们在前面己经反复强调。下面就通过演示使用多线程技术结合Selenium Grid实现分布式并行地执行测试用例。

    启动 Selenium Server

    在本机打开两个命令提示符窗口。

    本机启动一个主hub和一个node节点(端口号别分为4444和5555),本机IP地址为: 172.16.101.239。

    >java -jar selenium-server-standalone-2.47.0.jar -role hub

    >java -jar selenium-server-standalone-2.47.0.jar -role node -port 5555

    启动一个远程的node(设罝端口号为6666),IP地址假设为:172.16.101.238。

    >java -jar selenium-server-standalone-2.47.jar -role node -port 6666 -hub http://172.16.101.239:4444/grid/register

    运行测试脚本。

    grid_thread.py

    from threading import Thread

    from selenium import webdriver

    from time import sleep,ctime

    #测试用例

    def test_baidu(host,browser):

        print('start:%s' %ctime())

        print(host, browser)

        dc = {'browserName':browser}

        driver = webdriver.Remote(conunand_executor=host,

                                  desired_capabilities=dc)

        driver.get('http://www.baidu.com')

        driver.find_element_by_id("kw").send_keys(browser)

        driver.find_element_by_id("su").click()

        sleep(2)

        driver.quit()

    if __name__ =='__main__':   

    #启动参数(指定运行主机与浏览器)

        lists = {'http://127.0.0.1:4444/wd/hub': 'chrome',

                 'http://127.0.0.1:5555/wd/hub': 'internet explorer',

                 'http://172.16.101.238:6666/wd/hub':'firefox', # 远程节点

                 }

        threads =[]

        files = range(len(lists))

        #创建线程

        for host, browser in lists.items():

                t = Thread(target=test_baidu, args=(host, browser))

                threads.append(t)

        #启动线程

        for i in files:

            threads[i].start()

        for i in files:

            threads [i].join()

        print('end:%s' %ctime())

     6、多线程执行测试用例

    from threading import Thread
    from selenium import webdriver
    from time import ctime,sleep
    #测试用例
    def test_baidu(browser,search):
    print('start:%s' %ctime())
    print('browser:%s ,' %browser)
    if browser == "chrome":
    driver = webdriver.Chrome()
    elif browser == "ff":
    driver = webdriver.Firefox()
    else:
    print('browser参数有误,只能为ff、chrome')
    driver.get('http://www.baidu.com')
    driver.find_element_by_id("kw").send_keys(search)
    driver.find_element_by_id('su').click()
    sleep(2)
    driver.quit()
    if __name__ == "__main__":
    #启动参数(指浏览器与百度搜索内容)
    lists ={'chrome':'threading', 'ff':'python'}
    threads =[]
    files = range(len(lists))
    #创建线程
    for browser, search in lists.items():
    t = Thread(target=test_baidu,args=(browser,search))
    threads.append(t)
    #启动线程
    for t in files:
    threads[t].start()
    for t in files:
    threads[t].join()
    print('end:%s' %ctime())
  • 相关阅读:
    vSan中见证组件witness详解
    zabbix监控企业esxi虚拟机
    新特性之MAPI over HTTP 配置 MAPI over HTTP
    Exchange Server 产品路线图 及 补丁下载
    人生的第一桶金
    这不是我想要的生活,努力才是王道!
    孤独的灵魂该去何处安家
    如何查看myeclipse是否激活
    Visual Studio 2013如何破解(密钥激活)
    unity破解步骤
  • 原文地址:https://www.cnblogs.com/linxiu-0925/p/10077413.html
Copyright © 2011-2022 走看看