zoukankan      html  css  js  c++  java
  • python多线程并发

    1、循环创建多个线程,并通过循环启动执行

    import threading
    from datetime import *
    from time import sleep
    
    # 单线程执行
    def test():
        print('hello world')
    
    t = threading.Thread(target=test)
    t.start()
    
    # 多线程执行
    def test_01():
        sleep(1)
        x = 0
        while x == 0:        # 设置一个死循环
            print(datetime.now())             # 获取当前系统时间
    
    def looptest():
        '''
        循环20次执行 test_o1()函数
        :return:
        '''
        for i in range(20):
            test_01()
    
    def thd():
        '''
        创建并执行多个线程
        需求:并发执行50次 test_o1()函数
        说明:把50的并发拆成25个线程组,每个线程再循环20次执行 test_o1()函数,这样在启动下一个线程的时候,
        上一个线程已经在循环了,以此类推,当启动第25个线程的时候,可能已经执行了200次的t est_o1()函数,
        这样就可以大大减少并发的时间差异
        :return:
        '''
        Threads = []
        for i in range(25):
            th = threading.Thread(target=looptest)
            Threads.append(th)
            '''
            守护线程:主线程执行完毕之后,会等待子线程全部执行完毕,才会关闭结束程序
            必须加在start()之前,默认为 false
            '''
            th.setDaemon(True)
        for th in Threads:
            th.start()
        for th in Threads:
            '''
            阻塞线程:等主线程执行完毕之后再关闭所有子线程
            必须加在start()之后
            可以通过join()的timeout参数来完美解决相互等待的问题,子线程告诉主线程让其等待0.04秒,
            0.04秒之内子线程完成,主线程就继续往下执行,0.04秒之后如果子线程还未完成,主线程也会
            继续往下执行,执行完成之后关闭子线程
            '''
            th.join(0.04)
    
    if __name__=="__main__":
        print('start')
        thd()
        print('end')

    2、并发测试框架

    # 并发测试框架
    THREAD_NUM = 1
    ONE_WORKER_NUM = 1
    def test():
        pass            # 测试代码
    
    def working():
        global ONE_WORKER_NUM
        for i in range(0, ONE_WORKER_NUM):
            test()
            
    def t():
        global THREAD_NUM
        Threads = []
        for i in range(THREAD_NUM):
            t = threading.Thread(target=working,name='T'+str(i))
            t.setDaemon(True)
            Threads.append(t)
        for t in Threads:
            t.start()
        for t in Threads:
            t.join()
            
    if __name__=="__main__":
        t()
  • 相关阅读:
    objectARX 获取ucs的X方向
    passivedns 安装指南
    sql server手工注入
    libimobiledevice安装步骤
    CAD系统变量(参数)大全
    objectARX判断当前坐标系
    access手工注入
    网站自动跳转
    python黑帽子源码
    objectarx 卸载加载arx模块
  • 原文地址:https://www.cnblogs.com/jasmine0627/p/11674830.html
Copyright © 2011-2022 走看看