zoukankan      html  css  js  c++  java
  • python异常机制、多进程与PyQt5中的QTimer、多线程

    1.异常处理机制

    def test(x):
        try:
            y = 10 / x 
            print(y)
    	#except Exception as e:
    		#print(e)				#可以打印出异常的类型
        except ZeroDivisionError:   #抛出异常,执行下面的程序,如果是界面软件可以弹出一个窗口,提示用户输入错误
            print(x)
        else:                       #如果程序不存在异常,则执行该程序
            print(100 / x)
        finally:
            print('the program end')    #不管程序是否错误,始终都会执行
    test(0)
    # 0
    # the program end 
    test(1) 
    # 10.0
    # 100.0
    # the program end 
    def test2(): 
        try: 
            assert False, 'error'
        except:
            print('another')
        # 输出 another
        def myLevel(level):
            if level < 1:
                raise ValueError('Invalid level', level)  #自定义异常
        try:
            myLevel(0)    
        except ValueError:
            print('level < 1')
        #输出 :level < 1
    print('test 2--')
    test2()

    2.多进程

    引入多进程的库文件: import multiprocessing 

    多进程测试, 需要在主函数 main 中 进行测试
     创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。
     方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。
     属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置

    daemon 是守护进程 :daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。#主线程执行完毕之后,不管子线程是否执行完毕都随着主线程一起结束。

    def worker_1():
        print('this is worker 1')
        time.sleep(3)
        print('worker 1 end')
    #     n = 3
    #     while n > 0:
    #         print("The time is {0}".format(time.ctime()))
    #         time.sleep(interval)
    #         n -= 1
     
    def worker_2():
        print('this is worker 2')
        time.sleep(3)
        print('worker 2 end')
     
    def worker_3():
        print('this is worker 3')
        time.sleep(3)
        print('worker 3 end')
     
    if __name__ == "__main__":
        p1 = multiprocessing.Process(target = worker_1, args = ())
        p2 = multiprocessing.Process(target = worker_2, args = ())
        p3 = multiprocessing.Process(target = worker_3, args = ())
         
    #     p1.daemon = True    # 设置子程序为 守护进程, 主程序结束后,它就随之结束(即如果没有join函数,将不执行worker1函数), 需要放在start 前面
         
        p1.start()  #进程调用 start 的时候,将自动调用 run 函数
        p2.start()
        p3.start()      #运行3 个worker 程序花费的时间为3 s, 使用了3 个进程进行处理
        p1.join()
        p2.join()
        p3.join()        #添加 join 函数, 先执行 worker 函数, 再执行主进程函数
        for p in multiprocessing.active_children(): #这是主进程函数
            print('name is :' + p.name + 'pid is :' + str(p.pid) + 'is _alive: '+ str(p.is_alive()))
        print('电脑的核数'+ str(multiprocessing.cpu_count()))    # 电脑的核数4
        print('end----')

    输出:

    #---没有 join 函数---
    # name is :Process-1pid is :6488is _alive: True
    # name is :Process-2pid is :5660is _alive: True
    # name is :Process-3pid is :7776is _alive: True
    # 电脑的核数4
    # end----
    # this is worker 1
    # this is worker 2
    # this is worker 3
    # worker 1 end
    # worker 2 end
    # worker 3 end    
     
    #---有 join 函数---   等待所有子进程结束,再执行主进程
    # this is worker 1
    # this is worker 2
    # this is worker 3
    # worker 1 end
    # worker 2 end
    # worker 3 end
    # 电脑的核数4    #此时执行完线程, active_children() 为空
    # end----  

    使用多个进程访问共享资源的时候,需要使用Lock 来避免访问的冲突

    def worker_first(lock, f):
        with lock:
            with open(f, 'a+') as t:    #with 自动关闭文件
                n = 3
                while n > 0:
                    t.write('first write ---')
                    n -= 1
     
    def worker_second(lock, f):
        lock.acquire()  #获取锁
        try:
            with open(f, 'a+') as t:
                n = 3
                while n > 0:
                    t.write('second write ---')
                    n -= 1
        finally:
            lock.release()  #使用异常机制,保证最后释放锁
         
    def myProcess():
        lock = multiprocessing.Lock()
        f = r'D:myfile1.txt'
        p1 = multiprocessing.Process(target=worker_first, args=(lock, f, ))
        p2 = multiprocessing.Process(target=worker_second, args=(lock, f, ))
        p1.start()
        p2.start()
        print('end -----------')
    if __name__ == '__main__':
        myProcess()

    输出:

    first write ---first write ---first write ---second write ---second write ---second write ---

    进程池

    进程池 Pool 
     在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,
     此时可以发挥进程池的功效。Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,
     那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
     
    apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的

    非阻塞和阻塞进程池

    单个函数使用线程池:

    def func(msg):
        print('msg is :' + msg)
        time.sleep(2)
        print(msg + ' end')
        return msg + ' re'
     
    if __name__ == '__main__':
        pool = multiprocessing.Pool(processes=3)    #创建进程池
        result = [] 
        for i in range(4):
            msg = '%d'%(i)
    #         result.append(pool.apply_async(func, (msg, )))
            pool.apply_async(func, (msg, )) #非阻塞进程池, 维持总的进程数为 processes=3, 当一个进程结束后,添加另一个进程进去
    #         pool.apply(func, (msg, ))   #阻塞进程池
             
        print('start------')
        pool.close()  #  close 函数需要在 join 函数之前, 关闭进程池后,没有进程加入到pool中
        pool.join() # join函数等待所有子进程结束
        print('all program done')
    #     print([re.get() for re in result])  #获取结果

    输出:

    #---非阻塞进程池 输出------
    # start------
    # msg is :0
    # msg is :1
    # msg is :2
    # 0 end
    # msg is :3
    # 1 end
    # 2 end
    # 3 end
    # all program done    
    #---阻塞进程池 输出------ 顺序执行
    # msg is :0
    # 0 end
    # msg is :1
    # 1 end
    # msg is :2
    # 2 end
    # msg is :3
    # 3 end
    # start------
    # all program done

    多个函数使用线程池

    def f1():
        result = 0
        print('this is one')
        for i in range(100000):
            result += i
        return result
    
    def f2():
        result = 1
        print('this is two')
        for i in range(2, 100000):
            result *= i
        return result
    
    if __name__ == '__main__':
        start = time.time()
        myPool = multiprocessing.Pool(processes=2)   
        result = []
        print('start----')
        myFunc = [f1, f2]
        for f in myFunc:
            result.append(myPool.apply_async(f))    #将输出结果保存到列表中
              
        myPool.close()
        myPool.join()
        print([re.get() for re in result])
        end = time.time()
        print('花费的时间: ' + str(end-start)) #huafei de 时间: 8.397480010986328
        print('end------')

    使用get() 函数,同样可以获取pandas 的数据结构;

    def f1():
        result = 0
        print('this is one')
        result = pd.DataFrame(np.arange(16).reshape(-1, 4))
        return result
    #     for i in range(100000):
    #         result += i
    #     return result
    
    def f2():
        result = 1
        print('this is two')
        result = pd.DataFrame(np.arange(9).reshape(-1, 3))
        return result    
    #     for i in range(2, 100000):
    #         result *= i
    #     return result
    
    if __name__ == '__main__':
        start = time.time()
        myPool = multiprocessing.Pool(processes=2)   
        result = []
        print('start----')
        myFunc = [f1, f2]
        for f in myFunc:
            result.append(myPool.apply_async(f))    #将输出结果保存到列表中
              
        myPool.close()
        myPool.join()
        for re in result:
            print(re.get())

     输出结果:

    start----
    this is one
    this is two
        0   1   2   3
    0   0   1   2   3
    1   4   5   6   7
    2   8   9  10  11
    3  12  13  14  15
       0  1  2
    0  0  1  2
    1  3  4  5
    2  6  7  8
    花费的时间: 0.9370532035827637
    end------

    3.PyQt5 中的QTimer 模块

    pyqt5中的多线程的应用,多线程技术涉及3种方法,1.使用计时器QTimer, 2.使用多线程模块QThread ,3.使用事件处理功能
    1.使用QTimer模块,创建QTimer实例,将timeout信号连接到槽函数,并调用timeout 信号。 self.timer.start(1000)时间为毫秒
    引入模块
    from PyQt5.QtCore import QTimer, QDateTime

    def timerTest():
        class WinForm(QWidget):  
            def __init__(self,parent=None): 
                super(WinForm,self).__init__(parent) 
                self.setWindowTitle("QTimer demo")
                self.listFile= QListWidget() 
                self.label = QLabel('显示当前时间')
                self.startBtn = QPushButton('开始') 
                self.endBtn = QPushButton('结束') 
                layout = QGridLayout(self) 
        
                # 初始化一个定时器
                self.timer = QTimer(self)
                # showTime()方法
                self.timer.timeout.connect(self.showTime)   #
                
                layout.addWidget(self.label,0,0,1,2)   
                layout.addWidget(self.startBtn,1,0) 
                layout.addWidget(self.endBtn,1,1)         
                
                self.startBtn.clicked.connect( self.startTimer) #首先按下start开始计时,到了2s后,触发timeout 信号
                self.endBtn.clicked.connect( self.endTimer) 
                        
                self.setLayout(layout)   
                QTimer.singleShot(5000, self.showmsg)   #给定的时间间隔后,调用一个槽函数来发射信号
                
            def showmsg(self):
                QMessageBox.information(self, '提示信息', '你好')
               
            def showTime(self): 
                # 获取系统现在的时间
                time = QDateTime.currentDateTime() #获取当前时间
                # 设置系统时间显示格式
                timeDisplay = time.toString("yyyy-MM-dd hh:mm:ss dddd")
                # 在标签上显示时间
                self.label.setText( timeDisplay ) 
        
            def startTimer(self): 
                # 设置计时间隔并启动
                self.timer.start(1000)  #时间为 1 s
                self.startBtn.setEnabled(False)
                self.endBtn.setEnabled(True)
        
            def endTimer(self): 
                self.timer.stop()
                self.startBtn.setEnabled(True)  #依次循环
                self.endBtn.setEnabled(False)
                
        if __name__ == "__main__":  
            app = QApplication(sys.argv)  
            form = WinForm()  
            form.show()  
    #         QTimer.singleShot(5000, app.quit)    #界面运行5秒后,自动关闭
            sys.exit(app.exec_())
    
    timerTest()

    QThread模块,使用该模块开始一个线程,可以创建它的一个子类,然后覆盖QThread.run() 的方法。调用自定义的线程时,调用start()方法,会自动调用run的方法,
    QThread 还有 started , finished 信号,可以见信号连接到槽函数中

    def threadTest():
        class MainWidget(QWidget):
            def __init__(self,parent=None):
                super(MainWidget,self).__init__(parent)
                self.setWindowTitle("QThread 例子")    
                self.thread = Worker()  #创建线程
                self.listFile = QListWidget()
                self.btnStart = QPushButton('开始')
                self.btnStop = QPushButton('结束')
                layout = QGridLayout(self)
                layout.addWidget(self.listFile,0,0,1,2)
                layout.addWidget(self.btnStart,1,1) 
                layout.addWidget(self.btnStop,2,1)
                self.btnStart.clicked.connect( self.slotStart ) #通过按钮状态实现  线程的开启
                self.btnStop.clicked.connect(self.slotStop)
                self.thread.sinOut.connect(self.slotAdd)
             
            def slotStop(self):
                self.btnStart.setEnabled(True)
                self.thread.wait(2000)
              
            def slotAdd(self,file_inf):
                self.listFile.addItem(file_inf) #增加子项
                
            def slotStart(self):
                self.btnStart.setEnabled(False)
                self.thread.start() #线程开始
                
    #创建一个线程
        class Worker(QThread):
            sinOut = pyqtSignal(str) #自定义一个信号
            
            def __init__(self, parent=None):
                super(Worker, self).__init__(parent)
                self.work = True
                self.num = 0
            
            def run(self):
                while self.work == True:
                    listStr = 'this is index file {0}'.format(self.num)
                    self.sinOut.emit(listStr)   #发射一个信号
                    self.num += 1
                    self.sleep(2)       #休眠2 秒
            
            def __del__(self):
                self.work = False
                self.wait()
        
        if __name__ == "__main__":              
            app = QApplication(sys.argv)
            demo = MainWidget()
            demo.show()
            sys.exit(app.exec_())
            
    # threadTest()

    demo 2:

    def threadTest2():
        global sec
        sec=0
        
        class WorkThread(QThread):  #自定义一个线程
            trigger = pyqtSignal()
            def __int__(self):
                super(WorkThread,self).__init__()
        
            def run(self):
                for i in range(2000000000):
                    print('haha ' + str(i)) #此处放运行 量较大的程序
                
                # 循环完毕后发出信号        
                self.trigger.emit()        
        
        def countTime():
            global  sec
            sec += 1
            # LED显示数字+1
            lcdNumber.display(sec)          
        
        def work():
            # 计时器每秒计数
            timer.start(1000)   #每秒运行完后,是lcd增加数字
            # 计时开始    
            workThread.start()       
            # 当获得循环完毕的信号时,停止计数    
            workThread.trigger.connect(timeStop)  
        
        def timeStop():
            timer.stop()
            print("运行结束用时",lcdNumber.value())
            global sec
            sec=0
        
        if __name__ == "__main__":      
            app = QApplication(sys.argv) 
            top = QWidget()
            top.resize(300,120)
            
            # 垂直布局类QVBoxLayout
            layout = QVBoxLayout(top) 
            # 加个显示屏    
            lcdNumber = QLCDNumber()             
            layout.addWidget(lcdNumber)
            button = QPushButton("测试")
            layout.addWidget(button)
        
            timer = QTimer()
            workThread = WorkThread()
        
            button.clicked.connect(work)
            # 每次计时结束,触发 countTime
            timer.timeout.connect(countTime)      
        
            top.show()
            sys.exit(app.exec_())
            
    threadTest2()

    参考:

    多进程https://www.cnblogs.com/kaituorensheng/p/4445418.html

  • 相关阅读:
    全方位深度剖析--性能测试之LoardRunner 介绍
    国外性能测试博客
    由我主讲的软件测试系列视频之性能测试系列视频讲座目录出炉了
    性能测试之系统监控工具nmon
    性能测试学习内容指南
    性能测试之操作系统计数器分析方法
    JAVA正则表达式:Pattern类与Matcher类详解
    (总结)密码破解之王:Ophcrack彩虹表(Rainbow Tables)原理详解(附:120G彩虹表下载)
    border-collapse实现表格细线边框
    安卓造成内存泄露的几个原因
  • 原文地址:https://www.cnblogs.com/junge-mike/p/12761353.html
Copyright © 2011-2022 走看看