zoukankan      html  css  js  c++  java
  • python异常机制、多进程与PyQt5中的QTimer、多线程

    1.异常处理机制

    def test(x):
        try:
            y = 10 / x 
            print(y)
    	#except Exception as e:
    		#print(e)				#可以打印出异常的类型
        except ZeroDivisionError:   #抛出异常,执行下面的程序,如果是界面软件可以弹出一个窗口,提示用户输入错误
            print(x)
        else:                       #如果程序不存在异常,则执行该程序
            print(100 / x)
        finally:
            print('the program end')    #不管程序是否错误,始终都会执行
    test(0)
    # 0
    # the program end 
    test(1) 
    # 10.0
    # 100.0
    # the program end 
    def test2(): 
        try: 
            assert False, 'error'
        except:
            print('another')
        # 输出 another
        def myLevel(level):
            if level < 1:
                raise ValueError('Invalid level', level)  #自定义异常
        try:
            myLevel(0)    
        except ValueError:
            print('level < 1')
        #输出 :level < 1
    print('test 2--')
    test2()

    2.多进程

    引入多进程的库文件: import multiprocessing 

    多进程测试, 需要在主函数 main 中 进行测试
     创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。
     方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。
     属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置

    daemon 是守护进程 :daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。#主线程执行完毕之后,不管子线程是否执行完毕都随着主线程一起结束。

    def worker_1():
        print('this is worker 1')
        time.sleep(3)
        print('worker 1 end')
    #     n = 3
    #     while n > 0:
    #         print("The time is {0}".format(time.ctime()))
    #         time.sleep(interval)
    #         n -= 1
     
    def worker_2():
        print('this is worker 2')
        time.sleep(3)
        print('worker 2 end')
     
    def worker_3():
        print('this is worker 3')
        time.sleep(3)
        print('worker 3 end')
     
    if __name__ == "__main__":
        p1 = multiprocessing.Process(target = worker_1, args = ())
        p2 = multiprocessing.Process(target = worker_2, args = ())
        p3 = multiprocessing.Process(target = worker_3, args = ())
         
    #     p1.daemon = True    # 设置子程序为 守护进程, 主程序结束后,它就随之结束(即如果没有join函数,将不执行worker1函数), 需要放在start 前面
         
        p1.start()  #进程调用 start 的时候,将自动调用 run 函数
        p2.start()
        p3.start()      #运行3 个worker 程序花费的时间为3 s, 使用了3 个进程进行处理
        p1.join()
        p2.join()
        p3.join()        #添加 join 函数, 先执行 worker 函数, 再执行主进程函数
        for p in multiprocessing.active_children(): #这是主进程函数
            print('name is :' + p.name + 'pid is :' + str(p.pid) + 'is _alive: '+ str(p.is_alive()))
        print('电脑的核数'+ str(multiprocessing.cpu_count()))    # 电脑的核数4
        print('end----')

    输出:

    #---没有 join 函数---
    # name is :Process-1pid is :6488is _alive: True
    # name is :Process-2pid is :5660is _alive: True
    # name is :Process-3pid is :7776is _alive: True
    # 电脑的核数4
    # end----
    # this is worker 1
    # this is worker 2
    # this is worker 3
    # worker 1 end
    # worker 2 end
    # worker 3 end    
     
    #---有 join 函数---   等待所有子进程结束,再执行主进程
    # this is worker 1
    # this is worker 2
    # this is worker 3
    # worker 1 end
    # worker 2 end
    # worker 3 end
    # 电脑的核数4    #此时执行完线程, active_children() 为空
    # end----  

    使用多个进程访问共享资源的时候,需要使用Lock 来避免访问的冲突

    def worker_first(lock, f):
        with lock:
            with open(f, 'a+') as t:    #with 自动关闭文件
                n = 3
                while n > 0:
                    t.write('first write ---')
                    n -= 1
     
    def worker_second(lock, f):
        lock.acquire()  #获取锁
        try:
            with open(f, 'a+') as t:
                n = 3
                while n > 0:
                    t.write('second write ---')
                    n -= 1
        finally:
            lock.release()  #使用异常机制,保证最后释放锁
         
    def myProcess():
        lock = multiprocessing.Lock()
        f = r'D:myfile1.txt'
        p1 = multiprocessing.Process(target=worker_first, args=(lock, f, ))
        p2 = multiprocessing.Process(target=worker_second, args=(lock, f, ))
        p1.start()
        p2.start()
        print('end -----------')
    if __name__ == '__main__':
        myProcess()

    输出:

    first write ---first write ---first write ---second write ---second write ---second write ---

    进程池

    进程池 Pool 
     在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,
     此时可以发挥进程池的功效。Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,
     那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
     
    apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的

    非阻塞和阻塞进程池

    单个函数使用线程池:

    def func(msg):
        print('msg is :' + msg)
        time.sleep(2)
        print(msg + ' end')
        return msg + ' re'
     
    if __name__ == '__main__':
        pool = multiprocessing.Pool(processes=3)    #创建进程池
        result = [] 
        for i in range(4):
            msg = '%d'%(i)
    #         result.append(pool.apply_async(func, (msg, )))
            pool.apply_async(func, (msg, )) #非阻塞进程池, 维持总的进程数为 processes=3, 当一个进程结束后,添加另一个进程进去
    #         pool.apply(func, (msg, ))   #阻塞进程池
             
        print('start------')
        pool.close()  #  close 函数需要在 join 函数之前, 关闭进程池后,没有进程加入到pool中
        pool.join() # join函数等待所有子进程结束
        print('all program done')
    #     print([re.get() for re in result])  #获取结果

    输出:

    #---非阻塞进程池 输出------
    # start------
    # msg is :0
    # msg is :1
    # msg is :2
    # 0 end
    # msg is :3
    # 1 end
    # 2 end
    # 3 end
    # all program done    
    #---阻塞进程池 输出------ 顺序执行
    # msg is :0
    # 0 end
    # msg is :1
    # 1 end
    # msg is :2
    # 2 end
    # msg is :3
    # 3 end
    # start------
    # all program done

    多个函数使用线程池

    def f1():
        result = 0
        print('this is one')
        for i in range(100000):
            result += i
        return result
    
    def f2():
        result = 1
        print('this is two')
        for i in range(2, 100000):
            result *= i
        return result
    
    if __name__ == '__main__':
        start = time.time()
        myPool = multiprocessing.Pool(processes=2)   
        result = []
        print('start----')
        myFunc = [f1, f2]
        for f in myFunc:
            result.append(myPool.apply_async(f))    #将输出结果保存到列表中
              
        myPool.close()
        myPool.join()
        print([re.get() for re in result])
        end = time.time()
        print('花费的时间: ' + str(end-start)) #huafei de 时间: 8.397480010986328
        print('end------')

    使用get() 函数,同样可以获取pandas 的数据结构;

    def f1():
        result = 0
        print('this is one')
        result = pd.DataFrame(np.arange(16).reshape(-1, 4))
        return result
    #     for i in range(100000):
    #         result += i
    #     return result
    
    def f2():
        result = 1
        print('this is two')
        result = pd.DataFrame(np.arange(9).reshape(-1, 3))
        return result    
    #     for i in range(2, 100000):
    #         result *= i
    #     return result
    
    if __name__ == '__main__':
        start = time.time()
        myPool = multiprocessing.Pool(processes=2)   
        result = []
        print('start----')
        myFunc = [f1, f2]
        for f in myFunc:
            result.append(myPool.apply_async(f))    #将输出结果保存到列表中
              
        myPool.close()
        myPool.join()
        for re in result:
            print(re.get())

     输出结果:

    start----
    this is one
    this is two
        0   1   2   3
    0   0   1   2   3
    1   4   5   6   7
    2   8   9  10  11
    3  12  13  14  15
       0  1  2
    0  0  1  2
    1  3  4  5
    2  6  7  8
    花费的时间: 0.9370532035827637
    end------

    3.PyQt5 中的QTimer 模块

    pyqt5中的多线程的应用,多线程技术涉及3种方法,1.使用计时器QTimer, 2.使用多线程模块QThread ,3.使用事件处理功能
    1.使用QTimer模块,创建QTimer实例,将timeout信号连接到槽函数,并调用timeout 信号。 self.timer.start(1000)时间为毫秒
    引入模块
    from PyQt5.QtCore import QTimer, QDateTime

    def timerTest():
        class WinForm(QWidget):  
            def __init__(self,parent=None): 
                super(WinForm,self).__init__(parent) 
                self.setWindowTitle("QTimer demo")
                self.listFile= QListWidget() 
                self.label = QLabel('显示当前时间')
                self.startBtn = QPushButton('开始') 
                self.endBtn = QPushButton('结束') 
                layout = QGridLayout(self) 
        
                # 初始化一个定时器
                self.timer = QTimer(self)
                # showTime()方法
                self.timer.timeout.connect(self.showTime)   #
                
                layout.addWidget(self.label,0,0,1,2)   
                layout.addWidget(self.startBtn,1,0) 
                layout.addWidget(self.endBtn,1,1)         
                
                self.startBtn.clicked.connect( self.startTimer) #首先按下start开始计时,到了2s后,触发timeout 信号
                self.endBtn.clicked.connect( self.endTimer) 
                        
                self.setLayout(layout)   
                QTimer.singleShot(5000, self.showmsg)   #给定的时间间隔后,调用一个槽函数来发射信号
                
            def showmsg(self):
                QMessageBox.information(self, '提示信息', '你好')
               
            def showTime(self): 
                # 获取系统现在的时间
                time = QDateTime.currentDateTime() #获取当前时间
                # 设置系统时间显示格式
                timeDisplay = time.toString("yyyy-MM-dd hh:mm:ss dddd")
                # 在标签上显示时间
                self.label.setText( timeDisplay ) 
        
            def startTimer(self): 
                # 设置计时间隔并启动
                self.timer.start(1000)  #时间为 1 s
                self.startBtn.setEnabled(False)
                self.endBtn.setEnabled(True)
        
            def endTimer(self): 
                self.timer.stop()
                self.startBtn.setEnabled(True)  #依次循环
                self.endBtn.setEnabled(False)
                
        if __name__ == "__main__":  
            app = QApplication(sys.argv)  
            form = WinForm()  
            form.show()  
    #         QTimer.singleShot(5000, app.quit)    #界面运行5秒后,自动关闭
            sys.exit(app.exec_())
    
    timerTest()

    QThread模块,使用该模块开始一个线程,可以创建它的一个子类,然后覆盖QThread.run() 的方法。调用自定义的线程时,调用start()方法,会自动调用run的方法,
    QThread 还有 started , finished 信号,可以见信号连接到槽函数中

    def threadTest():
        class MainWidget(QWidget):
            def __init__(self,parent=None):
                super(MainWidget,self).__init__(parent)
                self.setWindowTitle("QThread 例子")    
                self.thread = Worker()  #创建线程
                self.listFile = QListWidget()
                self.btnStart = QPushButton('开始')
                self.btnStop = QPushButton('结束')
                layout = QGridLayout(self)
                layout.addWidget(self.listFile,0,0,1,2)
                layout.addWidget(self.btnStart,1,1) 
                layout.addWidget(self.btnStop,2,1)
                self.btnStart.clicked.connect( self.slotStart ) #通过按钮状态实现  线程的开启
                self.btnStop.clicked.connect(self.slotStop)
                self.thread.sinOut.connect(self.slotAdd)
             
            def slotStop(self):
                self.btnStart.setEnabled(True)
                self.thread.wait(2000)
              
            def slotAdd(self,file_inf):
                self.listFile.addItem(file_inf) #增加子项
                
            def slotStart(self):
                self.btnStart.setEnabled(False)
                self.thread.start() #线程开始
                
    #创建一个线程
        class Worker(QThread):
            sinOut = pyqtSignal(str) #自定义一个信号
            
            def __init__(self, parent=None):
                super(Worker, self).__init__(parent)
                self.work = True
                self.num = 0
            
            def run(self):
                while self.work == True:
                    listStr = 'this is index file {0}'.format(self.num)
                    self.sinOut.emit(listStr)   #发射一个信号
                    self.num += 1
                    self.sleep(2)       #休眠2 秒
            
            def __del__(self):
                self.work = False
                self.wait()
        
        if __name__ == "__main__":              
            app = QApplication(sys.argv)
            demo = MainWidget()
            demo.show()
            sys.exit(app.exec_())
            
    # threadTest()

    demo 2:

    def threadTest2():
        global sec
        sec=0
        
        class WorkThread(QThread):  #自定义一个线程
            trigger = pyqtSignal()
            def __int__(self):
                super(WorkThread,self).__init__()
        
            def run(self):
                for i in range(2000000000):
                    print('haha ' + str(i)) #此处放运行 量较大的程序
                
                # 循环完毕后发出信号        
                self.trigger.emit()        
        
        def countTime():
            global  sec
            sec += 1
            # LED显示数字+1
            lcdNumber.display(sec)          
        
        def work():
            # 计时器每秒计数
            timer.start(1000)   #每秒运行完后,是lcd增加数字
            # 计时开始    
            workThread.start()       
            # 当获得循环完毕的信号时,停止计数    
            workThread.trigger.connect(timeStop)  
        
        def timeStop():
            timer.stop()
            print("运行结束用时",lcdNumber.value())
            global sec
            sec=0
        
        if __name__ == "__main__":      
            app = QApplication(sys.argv) 
            top = QWidget()
            top.resize(300,120)
            
            # 垂直布局类QVBoxLayout
            layout = QVBoxLayout(top) 
            # 加个显示屏    
            lcdNumber = QLCDNumber()             
            layout.addWidget(lcdNumber)
            button = QPushButton("测试")
            layout.addWidget(button)
        
            timer = QTimer()
            workThread = WorkThread()
        
            button.clicked.connect(work)
            # 每次计时结束,触发 countTime
            timer.timeout.connect(countTime)      
        
            top.show()
            sys.exit(app.exec_())
            
    threadTest2()

    参考:

    多进程https://www.cnblogs.com/kaituorensheng/p/4445418.html

  • 相关阅读:
    java web项目打包.war格式
    version 1.4.2-04 of the jvm is not suitable for thi
    Sugarcrm Email Integration
    sharepoint 2010 masterpage中必须的Content PlaceHolder
    微信开放平台
    Plan for caching and performance in SharePoint Server 2013
    使用自定义任务审批字段创建 SharePoint 顺序工作流
    Technical diagrams for SharePoint 2013
    To get TaskID's Integer ID value from the GUID in SharePoint workflow
    how to get sharepoint lookup value
  • 原文地址:https://www.cnblogs.com/junge-mike/p/12761353.html
Copyright © 2011-2022 走看看