一、线程(线程是最小的工作单位,同一进程内的线程共享资源)
创建线程:threading模块
创建一个线程:threading.Thread(target=函数名,args=(参数,) ) *这里的args后面必须是元祖,而且当括号内是一个参数时,第一个参数后加逗号
我们利用threading模块创建的线程都是子线程,因为解释器在工作的时候会创建一个主线程来进行工作,我们自己创造的线程都是子线程
import threading import time def f1(a1,a2): time.sleep(2) print("456") t = threading.Thread(target = f1,args = (124,111,)) t.setDaemon(True)#不等待子线程执行完成 t.start() t1 = threading.Thread(target = f1,args = (124,111,)) t1.setDaemon(True)#不等待子线程执行完成 t1.start() t2 = threading.Thread(target = f1,args = (124,111,)) t2.setDaemon(True)#不等待子线程执行完成 t2.start() 注:主线程是代码从上往下执行(我们看不见),setDaemon默认为False,即主线程等待子线程执行完,一起退出,如果我们设置t2.setDaemon(True),即主线程不等子线程执行完就退出
t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
t.join(参数) :中的参数表示最多的等待时间,如果函数的执行时间小于等待时间,那么就等待你函数的执行时长,如果等待时间大于函数执行时间,那么等待你参数时间后就不等待了
t.setDaemon(True)#是决定是否等待子线程执行完成后再退出,加上表示不等待子线程执行完就退出,不加表示等待子线程后再退出
import threading import time def f1(a1,a2): time.sleep(3) print("456") t = threading.Thread(target = f1,args = (124,111,)) t.setDaemon(True)#不等待子线程执行完成 t.start() t.join(2) print("222") 注:主线程执行时间是3,而我等待时间是2,所以在我等待的时间内子线程没有执行,所以只执行主线程就退出
import threading import time def f1(a1,a2): time.sleep(3) print("456") t = threading.Thread(target = f1,args = (124,111,)) t.start() t.join(2) print("222") 注:主线程执行时间是3,而我等待时间是2,所以在我等待的时间内子线程没有执行,所以我先执行主线程的再执行你子线程的
二、 threading.Event
Event是线程间通信最间的机制之一:一个线程发送一个event信号,其他的线程则等待这个信号。用于主线程控制其他线程的执行。 Events 管理一个flag,这个flag可以使用set()设置成True或者使用clear()重置为False,wait()则用于阻塞,在flag为True之前。flag默认为False。
- Event.wait([timeout]) : 堵塞线程,直到Event对象内部标识位被设为True或超时(如果提供了参数timeout)。
- Event.set() :将标识位设为Ture
- Event.clear() : 将标识伴设为False。
- Event.isSet() :判断标识位是否为Ture。
import threading import time def do(event): time.sleep(1) print('start') event.wait() print('execute') event_obj = threading.Event() for i in range(10): t = threading.Thread(target=do, args=(event_obj,)) t.start() event_obj.clear() #设置为False inp = input('input:') if inp == 'true': event_obj.set() #设置为Ture
三、线程锁
由于同一进程下的多个线程共享同一内存地址的资源,所以就有可能出现不同的线程修改同一数据,导致脏数据(不正确的数据)的出现,所以我们应该在一个线程修改的时候加一把锁,等这个线程修改完后下一线程再来修改,所以就引进了线程锁的概念
import threading import time globals_num = 0 lock = threading.RLock() def Func(): lock.acquire() # 获得锁 global globals_num globals_num += 1 time.sleep(1) print(globals_num) lock.release() # 释放锁 for i in range(5): t = threading.Thread(target=Func) t.start() 注:每一个线程执行的时候都加一把锁,就不会出现脏数据,只有前面一个线程执行完,解锁以后后面一个线程才能获得变量并对其进行修改,所以输出顺序是每一个输出都等待1s,而且输出顺序递增
4、队列
特性:先进先出,单项队列只能一进一出
创建队列 :queue.Queue(参数)参数表示队列的容量即最多容纳的个数
q = queue.Queue(maxsize=0) # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。 q.join() # 等到队列为空的时候,在执行别的操作 q.qsize() # 返回队列的大小 (不可靠) q.empty() # 当队列为空的时候,返回True 否则返回False (不可靠) q.full() # 当队列满的时候,返回True,否则返回False (不可靠) q.put(item, block=True, timeout=None) # 表示往队列里面放 q.get(block=True, timeout=None) # 表示从队列里面取值,如果队列里面没值是会一直等即阻塞 q.put_nowait(item) # 等效于 put(item,block=False) q.get_nowait() # 等效于 get(item,block=False) 表示如果队列里面没值时不等
生产者--消费者 模型
import queue import threading message = queue.Queue(10) def producer(i): #每一个生成的线程都一直往队列里面放对应生成的值 while True: message.put(i) def consumer(i): ##每一个生成的线程都一直从队列里面获取生成的值 while True: msg = message.get() print(msg) for i in range(12): t = threading.Thread(target=producer, args=(i,)) #生成12个线程共同执行producer函数 t.start() for i in range(10): t = threading.Thread(target=consumer, args=(i,)) # #生成10个线程共同执行consumer函数 t.start()
import queue import threading message = queue.Queue(10) def producer(i): message.put(i) #每一个线程把各自生成的数放到队列中,一共放了5个 def consumer(i): msg = message.get() #一共有10个线程过来获取,只获取了5个就没有了,剩下的就一直等待 print(msg) for i in range(5): t = threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer, args=(i,)) t.start()
import queue import threading message = queue.Queue(10) def producer(i): message.put(i) #每一个线程把各自生成的数放到队列中,一共放了5个 def consumer(i): msg = message.get_nowait() #一共有10个线程过来获取,只获取了5个就没有了,不等待直接退出 print(msg) for i in range(5): t = threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer, args=(i,)) t.start()
五、进程
创建进程:multiprocessing模块
创建一个进程:multiprocessing.Process(target=函数名,args=(参数,) ) *这里的args后面必须是元祖,而且当括号内是一个参数是,第一个参数后加逗号
在windos下不支持fork()的创建,如果想用进程是必须把对进程的操作写在 if __name__=="__main__"下,在linux下不用
我们利用multiprocessing模块创建的线程都是子进程,因为解释器在工作的时候会创建一个主线程(包含在主进程中)来进行工作,我们自己创造的进程都是子进程
import time import multiprocessing def f1(a1,a2): time.sleep(2) print("456") if __name__ == '__main__': t = multiprocessing.Process(target = f1,args = (124,111,)) t.daemon = True#不等待子进程执行完成 t.start() t1 = multiprocessing.Process(target = f1,args = (124,111,)) t1.daemon = True#不等待子进程执行完成 t1.start() print("end")
end
注:主线程是代码从上往下执行(我们看不见),daemon默认为False,即主叫进程等待子进程执行完,一起退出,如果我们设置t1.daemon = True,即主进程不等待子进程执行完就退出
import time import multiprocessing def f1(a1,a2): time.sleep(2) print("456") if __name__ == '__main__': t = multiprocessing.Process(target = f1,args = (124,111,)) t.daemon = True#不等待子进程执行完成 t.start() print("111") t.join(1) print("end")
111
end
注:先执行print("111") 在等待1函数没执行就不等待了,直接执行print("end")
六、
由于多个进程有自己的内存资源,所以修改只是修改了各自的东西,所以不存在线程锁的概念
import multiprocessing li = [] def f(a): li.append(a) #不同的进程往不同的进程里面的li中添加元素 print(li) if __name__ =="__main__": for a in range(5): t = multiprocessing.Process(target=f,args=(a,)) t.start()
[0]
[2]
[1]
[3]
[4]
进程间的数据共享
如果我们想让俩个不同的进程共同修改同一内容的话,即如果你真有需要 要共享数据, multiprocessing提供了两种方式。
1、Array
from multiprocessing import Process,Array temp = Array('i', [11,22,33,44]) def Foo(i): temp[i] = 100+i for item in temp: print(i,'----->',item) for i in range(2): p = Process(target=Foo,args=(i,)) p.start()
2、Manager
import time from multiprocessing import Process,Manager def f(i,dic): dic[i] = 100 + i print(dic) if __name__ == "__main__": manage = Manager() dic = manage.dict() for i in range(5): p = Process(target= f,args=(i,dic,)) p.start() p.join() #必须要加,要不就找不到文件了
{0: 100}
{0: 100, 1: 101}
{0: 100, 1: 101, 2: 102}
{0: 100, 1: 101, 2: 102, 3: 103}
{0: 100, 1: 101, 2: 102, 3: 103, 4: 104}
进程池
由于线程和进程并不是越多越好,而是应该创建最优的数量,所以就有了进程池的概念,创建一个进程池并不会立即创建进程,而是根据需要从进程池里面取的时候才会创建进程
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中有两个方法:
- apply
- apply_async
from multiprocessing import Process,Pool import time def Foo(i): time.sleep(2) return i+100 def Bar(arg): print(arg) if __name__ == "__main__": pool = Pool(5) #print pool.apply(Foo,(1,)) #print pool.apply_async(func =Foo, args=(1,)).get() for i in range(10): pool.apply_async(func=Foo, args=(i,),callback=Bar) print('end') pool.close() pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
注:申请一个进程执行Foo方法,Foo方法没执行完,Bar方法永远不触发,只有Foo方法执行完了,才触发Bar方法,并将其返回值赋值给Bar方法的参数
回调函数:用于验证主函数是否执行完
apply :1、从进程池中申请进程
2、每次申请的一个线程总都自带一个join
3、特点是申请一个执行一个,完了再申请再执行
4、因为每一个申请的进程都有一个join,所以任务执行过程是排队进行的
5、申请的次数和进程池中进程的数量没有关系,因为我每次申请一个,等执行完第一个再执行下一个
from multiprocessing import Pool import time def f1(a): time.sleep(1) print(a) if __name__ == "__main__": pool = Pool(5) for i in range(6): T = pool.apply(func=f1,args=(i,)) print("222222222222") 0 222222222222 1 222222222222 2 222222222222 3 222222222222 4 222222222222 5 222222222222
apply_async:1、从进程池中申请进程
2、每次申请的一个线程都没有join,所以申请以后不等待执行就申请下一个
3、如果申请的数量大于进程池中进程的数量,则每次申请的进程数就是进程池中的进程数,等待执行完毕,进程回到进程池后才可以从新申请
4、每一个任务都是并发执行的,而且可以设置回调函数
5、进程池的执行默认不等待子进程的执行,所以,如果主进程执行完毕后子进程没执行完就直接退出了,所以我们应该在主进程执行后加一个join,这样主进程 就会等待子进程执行完毕后一起退出(自己瞎猜的O(∩_∩)O哈哈~)
from multiprocessing import Pool import time def f1(a): time.sleep(1) print(a) if __name__ == "__main__": pool = Pool(5) for i in range(9): pool.apply_async(func=f1,args=(i,)) print("222222222222") pool.close() pool.join() #进程池的方法 222222222222 222222222222 222222222222 222222222222 222222222222 222222222222 222222222222 222222222222 222222222222 0 1 2 3 4 5 6 7 8