zoukankan      html  css  js  c++  java
  • 线程,进程 ,队列 基本用法总结

    一、线程(线程是最小的工作单位,同一进程内的线程共享资源)

    创建线程:threading模块

    创建一个线程:threading.Thread(target=函数名,args=(参数,) )  *这里的args后面必须是元祖,而且当括号内是一个参数时,第一个参数后加逗号

    我们利用threading模块创建的线程都是子线程,因为解释器在工作的时候会创建一个主线程来进行工作,我们自己创造的线程都是子线程

    import threading
    import  time
    def f1(a1,a2):
        time.sleep(2)
        print("456")
    t = threading.Thread(target = f1,args = (124,111,))
    t.setDaemon(True)#不等待子线程执行完成
    t.start()
    t1 = threading.Thread(target = f1,args = (124,111,))
    t1.setDaemon(True)#不等待子线程执行完成
    t1.start()
    t2 = threading.Thread(target = f1,args = (124,111,))
    t2.setDaemon(True)#不等待子线程执行完成
    t2.start()
    
    
    注:主线程是代码从上往下执行(我们看不见),setDaemon默认为False,即主线程等待子线程执行完,一起退出,如果我们设置t2.setDaemon(True),即主线程不等子线程执行完就退出

    t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义

    t.join(参数) :中的参数表示最多的等待时间,如果函数的执行时间小于等待时间,那么就等待你函数的执行时长,如果等待时间大于函数执行时间,那么等待你参数时间后就不等待了

    t.setDaemon(True)#是决定是否等待子线程执行完成后再退出,加上表示不等待子线程执行完就退出,不加表示等待子线程后再退出

    import threading
    import  time
    def f1(a1,a2):
        time.sleep(3)
        print("456")
    t = threading.Thread(target = f1,args = (124,111,))
    t.setDaemon(True)#不等待子线程执行完成
    t.start()
    t.join(2)
    print("222")
    
    注:主线程执行时间是3,而我等待时间是2,所以在我等待的时间内子线程没有执行,所以只执行主线程就退出

     

    import threading
    import  time
    def f1(a1,a2):
        time.sleep(3)
        print("456")
    t = threading.Thread(target = f1,args = (124,111,))
    t.start()
    t.join(2)
    print("222")
    
    
    注:主线程执行时间是3,而我等待时间是2,所以在我等待的时间内子线程没有执行,所以我先执行主线程的再执行你子线程的

    二、 threading.Event

    Event是线程间通信最间的机制之一:一个线程发送一个event信号,其他的线程则等待这个信号。用于主线程控制其他线程的执行。 Events 管理一个flag,这个flag可以使用set()设置成True或者使用clear()重置为False,wait()则用于阻塞,在flag为True之前。flag默认为False。

    • Event.wait([timeout]) : 堵塞线程,直到Event对象内部标识位被设为True或超时(如果提供了参数timeout)。
    • Event.set() :将标识位设为Ture
    • Event.clear() : 将标识伴设为False。
    • Event.isSet() :判断标识位是否为Ture。
    import threading
    import time
    def do(event):
        time.sleep(1)
        print('start')
        event.wait()
        print('execute')
    
    event_obj = threading.Event()
    for i in range(10):
        t = threading.Thread(target=do, args=(event_obj,))
        t.start()
    
    event_obj.clear()       #设置为False
    inp = input('input:')
    if inp == 'true':
        event_obj.set()     #设置为Ture
    

    三、线程锁

    由于同一进程下的多个线程共享同一内存地址的资源,所以就有可能出现不同的线程修改同一数据,导致脏数据(不正确的数据)的出现,所以我们应该在一个线程修改的时候加一把锁,等这个线程修改完后下一线程再来修改,所以就引进了线程锁的概念

    import threading
    import time
    globals_num = 0
    lock = threading.RLock()
    def Func():
        lock.acquire()  # 获得锁
        global globals_num
        globals_num += 1
        time.sleep(1)
        print(globals_num)
        lock.release()  # 释放锁
    for i in range(5):
        t = threading.Thread(target=Func)
        t.start()
    
    注:每一个线程执行的时候都加一把锁,就不会出现脏数据,只有前面一个线程执行完,解锁以后后面一个线程才能获得变量并对其进行修改,所以输出顺序是每一个输出都等待1s,而且输出顺序递增
    

    4、队列

    特性:先进先出,单项队列只能一进一出

    创建队列 :queue.Queue(参数)参数表示队列的容量即最多容纳的个数

    q = queue.Queue(maxsize=0)  # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。
    
    q.join()    # 等到队列为空的时候,在执行别的操作
    q.qsize()   # 返回队列的大小 (不可靠)
    q.empty()   # 当队列为空的时候,返回True 否则返回False (不可靠)
    q.full()    # 当队列满的时候,返回True,否则返回False (不可靠)
    q.put(item, block=True, timeout=None) #  表示往队列里面放
    q.get(block=True, timeout=None) # 表示从队列里面取值,如果队列里面没值是会一直等即阻塞 
    q.put_nowait(item) #   等效于 put(item,block=False)
    q.get_nowait() #    等效于 get(item,block=False) 表示如果队列里面没值时不等
    

    生产者--消费者 模型

    import queue
    import threading
    message = queue.Queue(10)
    def producer(i):  #每一个生成的线程都一直往队列里面放对应生成的值
        while True:
            message.put(i)
    def consumer(i):  ##每一个生成的线程都一直从队列里面获取生成的值
        while True:
            msg = message.get()
            print(msg)
    for i in range(12):
        t = threading.Thread(target=producer, args=(i,)) #生成12个线程共同执行producer函数
        t.start()
    for i in range(10):
        t = threading.Thread(target=consumer, args=(i,))  # #生成10个线程共同执行consumer函数
        t.start()
    
    import queue
    import threading
    message = queue.Queue(10)
    def producer(i):   
            message.put(i)  #每一个线程把各自生成的数放到队列中,一共放了5个
    def consumer(i):
            msg = message.get() #一共有10个线程过来获取,只获取了5个就没有了,剩下的就一直等待
            print(msg)
    for i in range(5):
        t = threading.Thread(target=producer, args=(i,))
        t.start()
    for i in range(10):
        t = threading.Thread(target=consumer, args=(i,))
        t.start()
    
    import queue
    import threading
    message = queue.Queue(10)
    def producer(i):
            message.put(i)  #每一个线程把各自生成的数放到队列中,一共放了5个
    def consumer(i):
            msg = message.get_nowait()  #一共有10个线程过来获取,只获取了5个就没有了,不等待直接退出
            print(msg)
    for i in range(5):
        t = threading.Thread(target=producer, args=(i,))
        t.start()
    for i in range(10):
        t = threading.Thread(target=consumer, args=(i,))
        t.start()
    

     五、进程

    创建进程:multiprocessing模块

    创建一个进程:multiprocessing.Process(target=函数名,args=(参数,) ) *这里的args后面必须是元祖,而且当括号内是一个参数是,第一个参数后加逗号

    在windos下不支持fork()的创建,如果想用进程是必须把对进程的操作写在 if  __name__=="__main__"下,在linux下不用

    我们利用multiprocessing模块创建的线程都是子进程,因为解释器在工作的时候会创建一个主线程(包含在主进程中)来进行工作,我们自己创造的进程都是子进程

    import  time
    import multiprocessing
    def f1(a1,a2):
        time.sleep(2)
        print("456")
    if __name__ == '__main__':
        t = multiprocessing.Process(target = f1,args = (124,111,))
        t.daemon = True#不等待子进程执行完成
        t.start()
        t1 = multiprocessing.Process(target = f1,args = (124,111,))
        t1.daemon = True#不等待子进程执行完成
        t1.start()
        print("end")


    end

    注:主线程是代码从上往下执行(我们看不见),daemon默认为False,即主叫进程等待子进程执行完,一起退出,如果我们设置t1.daemon = True,即主进程不等待子进程执行完就退出
    import  time
    import multiprocessing
    def f1(a1,a2):
        time.sleep(2)
        print("456")
    if __name__ == '__main__':
        t = multiprocessing.Process(target = f1,args = (124,111,))
        t.daemon = True#不等待子进程执行完成
        t.start()
        print("111")
        t.join(1)
        print("end")

    111
    end

    注:先执行print("111") 在等待1函数没执行就不等待了,直接执行print("end")

     六、

    由于多个进程有自己的内存资源,所以修改只是修改了各自的东西,所以不存在线程锁的概念

    import multiprocessing
    li = []
    def f(a):
        li.append(a)  #不同的进程往不同的进程里面的li中添加元素
        print(li)
    if __name__ =="__main__":
        for a in range(5):
            t =  multiprocessing.Process(target=f,args=(a,))
            t.start()

    [0]
    [2]
    [1]
    [3]
    [4]

    进程间的数据共享 

    如果我们想让俩个不同的进程共同修改同一内容的话,即如果你真有需要 要共享数据, multiprocessing提供了两种方式。

    1、Array

    from multiprocessing import Process,Array
    
    temp = Array('i', [11,22,33,44])
    def Foo(i):
        temp[i] = 100+i
        for item in temp:
            print(i,'----->',item)
    for i in range(2):
        p = Process(target=Foo,args=(i,))
        p.start()
    

    2、Manager

    import time
    from  multiprocessing import Process,Manager
    def f(i,dic):
        dic[i] = 100 + i
        print(dic)
    if __name__  == "__main__":
        manage = Manager()
        dic = manage.dict()
        for i in range(5):
            p = Process(target= f,args=(i,dic,))
            p.start()
            p.join()  #必须要加,要不就找不到文件了

    {0: 100}
    {0: 100, 1: 101}
    {0: 100, 1: 101, 2: 102}
    {0: 100, 1: 101, 2: 102, 3: 103}
    {0: 100, 1: 101, 2: 102, 3: 103, 4: 104}

     进程池

     由于线程和进程并不是越多越好,而是应该创建最优的数量,所以就有了进程池的概念,创建一个进程池并不会立即创建进程,而是根据需要从进程池里面取的时候才会创建进程

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

    进程池中有两个方法:

    • apply 
    • apply_async
    from  multiprocessing import Process,Pool
    import time
    def Foo(i):
        time.sleep(2)
        return i+100
    def Bar(arg):
        print(arg)
    if __name__ == "__main__":
        pool = Pool(5)
        #print pool.apply(Foo,(1,))
        #print pool.apply_async(func =Foo, args=(1,)).get()
        for i in range(10):
            pool.apply_async(func=Foo, args=(i,),callback=Bar)
        print('end')
        pool.close()
        pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。


    注:申请一个进程执行Foo方法,Foo方法没执行完,Bar方法永远不触发,只有Foo方法执行完了,才触发Bar方法,并将其返回值赋值给Bar方法的参数

      回调函数:用于验证主函数是否执行完

    apply :1、从进程池中申请进程

         2、每次申请的一个线程总都自带一个join

         3、特点是申请一个执行一个,完了再申请再执行

         4、因为每一个申请的进程都有一个join,所以任务执行过程是排队进行的

         5、申请的次数和进程池中进程的数量没有关系,因为我每次申请一个,等执行完第一个再执行下一个

    from multiprocessing import Pool
    import time
    def f1(a):
        time.sleep(1)
        print(a)
    if __name__ == "__main__":
        pool = Pool(5)
        for i in range(6):
            T = pool.apply(func=f1,args=(i,))
            print("222222222222")
    
    0
    222222222222
    
    1
    222222222222
    
    2
    222222222222
    
    3
    222222222222
    
    4
    222222222222
    
    5
    222222222222
    

    apply_async:1、从进程池中申请进程

            2、每次申请的一个线程都没有join,所以申请以后不等待执行就申请下一个

            3、如果申请的数量大于进程池中进程的数量,则每次申请的进程数就是进程池中的进程数,等待执行完毕,进程回到进程池后才可以从新申请

            4、每一个任务都是并发执行的,而且可以设置回调函数

            5、进程池的执行默认不等待子进程的执行,所以,如果主进程执行完毕后子进程没执行完就直接退出了,所以我们应该在主进程执行后加一个join,这样主进程                                    就会等待子进程执行完毕后一起退出(自己瞎猜的O(∩_∩)O哈哈~)

    from multiprocessing import Pool
    import time
    def f1(a):
        time.sleep(1)
        print(a)
    if __name__ == "__main__":
        pool = Pool(5)
        for i in range(9):
            pool.apply_async(func=f1,args=(i,))
            print("222222222222")
        pool.close() 
        pool.join() #进程池的方法
    
    222222222222
    222222222222
    222222222222
    222222222222
    222222222222
    222222222222
    222222222222
    222222222222
    222222222222
    
    
    0
    1
    2
    3
    4
    
    
    5
    6
    7
    8
    
  • 相关阅读:
    spirngmvc整合mybatis
    C#微信支付
    centos mysql数据库主从同步
    centos 搭建ftp
    修改 Docker 默认网桥地址
    安装docker
    脚本自动化装centos6.5 python2.6升级2.7
    centos6.5 python2.6升级2.7
    weblogic 安装及发布web应用
    centos6.5安装pip方法
  • 原文地址:https://www.cnblogs.com/luxiaojun/p/5601889.html
Copyright © 2011-2022 走看看