1、joinablequeue队列
joinablequeue与queue一样,也是一种队列,其继承自queue,也有queue中的put 与get 方法,但是在joinablequeue中有自己的 task_done 与 join方法
task_done方法:
记录从队列中取出的数据是否执行完毕
join方法:
会等待对列取完后才会执行后续的代码,相当于是一个阻塞操作,与进程的join方法相似
2、joinablequeue方法及守护进程的应用
import time import random from multiprocessing import Process,JoinableQueue def consumer(name,q): while True: res=q.get() if res is None:break time.sleep(random.randint(1,3)) print('%s 吃了 %s' %(name,res)) q.task_done() def producer(name,q,food): for i in range(5): time.sleep(random.randint(1,2)) res='%s%s' %(food,i) q.put(res) print('%s 生产了 %s' %(name,res)) if __name__ == '__main__': q=JoinableQueue() #2、生产者们 p1=Process(target=producer,args=('lee',q,'包子')) p2=Process(target=producer,args=('andy',q,'水果')) #3、消费者们 c1=Process(target=consumer,args=('aaa',q)) c1.daemon=True p1.start() p2.start() c1.start() # 确定生产者确确实实已经生产完毕 p1.join() p2.join() p3.join() # 在生产者生产完毕后,开始等待队列中所有的数据被取完,且执行完毕(调用了task_done方法) q.join() print('主进程结束')
3、多线程理论
什么是多线程?
多线程就是多个正在执行的线程
线程就相当于流水线,线程时程序运行的最小单位,是一步一步进行的流程
为什么需要多线程:
在以前的学习中,我们认识了多进程,那为什么还需要多线程呢?
实际上这是一个误区,多进程与多线程是完全不同的两种事物,进程时资源分配的最小单位,而线程时程序执行的最小单位,一个进程中可以有多个线程,在我们创建进程时,实际上已经创建了一条主线程,进程只是资源的存放地 一以及线程的执行地,进程相当于实一个车间,线程相当于实进程中的一条流水线
多线程与多进程相比的优缺点:
进程是一个资源单位,创建进程开辟内存空间,将数据进行导入或者复制,在开启一个主线程,在进行执行
线程是一个执行单位,创建线程的时候既不需要开辟空间,也不需要进行代码或者数据的赋值,只是单纯的创建一个线程,使用进程的各种资源以及数据
在线程中数据是共享的,但是在进程中数据时不能互相访问的
from threading import Thread a = 10 def task(): global a print("子 running...") a = 20 t1 = Thread(target=task) t1.start() t1.join() # 主线程等待子线执行完毕 print(a) # 20
线程的创建
在python中,线程的创建于进程的创建基本相同
import multiprocessing.process t = threading.Thread(target=test) t.strat()
3、多线程创建的两种方式:
与多进程相同,一种方式是直接将要执行的函数作为参数传入到Thread中,另一种是创建自己的类,继承自Thread 类,再重写Thread类中的run方法
4、守护线程
与进程中的守护进程相似,在线程中,可以使用的deamon方法将一个子线程设置为守护线程,当主线程执行完毕后直接将守护线程一起带走
from threading import Thread import time def task(): print("子1running......") time.sleep(100) print("子1over......") def task2(): print("子2running......") time.sleep(4) print("子2over......") t = Thread(target=task) t.daemon = True t.start() t2 =Thread(target=task2) t2.start() print("主over") # 子 1 run 子2 run 主 子over 子2over 结束了
主线程代码执行 完毕后 不会立即结束 会等待其他子线程结束
主 会等待非守护线程 即t2
主线程会等待所有非守护线程结束后结束
守护线程会等到所有非守护线程结束后结束 ! 前提是除了主线程之外 还有后别的非守护
当然如果守护线程已经完成任务 立马就结束了
皇帝如果活着 守护者 妃子死了 皇帝正常运行 皇帝死了 无论守护者是否完成任务 都立即结束
5、互斥锁
在线程中,由于资源是共享的,所以多线程如果对主线程的数据进行修改,那么如果不加锁就会产生 数据的错乱,造成安全问题,所以此时需要进行加锁,在线程中,锁的创建与使用跟多进程完全像似,但是也有些许不同
from threading import Thread,enumerate,Lock import time number = 10 lock = Lock() def task(): global number lock.acquire() a = number time.sleep(0.1) number = a - 1 lock.release() for i in range(10): t = Thread(target=task) t.start() for t in enumerate()[1:]: # print(t) t.join() print(number) # 用于访问当前正在运行的所有线程 # print(enumerate())
线程与进程中的锁的创建有些许不同,在进行进程中锁的创建时,需要将锁加入到if __name__ == “__main__”
中,而在线程中,创建的锁在主线程中创建就可以了,这是因为创建进程时会导入主进程的数据,会产生多把锁,但是创建线程就只是创建了一个流水线,数据等都是使用主线程的,所以只需要在主线程中创建一把锁进行了
6、死锁
死锁问题指的是当程序中使用了两把锁时,而数据只有在拿到两把锁时才可以使用,那么如果一个线程拿到了一把锁,另一个线程拿到了另一把锁,他们都等着对方的锁realease,这样就会产生死锁问题
import time # 盘子 lock1 = Lock() # 筷子 lock2 = Lock() def eat1(): lock1.acquire() print("%s抢到了盘子" % current_thread().name) time.sleep(0.5) lock2.acquire() print("%s抢到了筷子" % current_thread().name) print("%s开吃了!" % current_thread().name) lock2.release() print("%s放下筷子" % current_thread().name) lock1.release() print("%s放下盘子" % current_thread().name) def eat2(): lock2.acquire() print("%s抢到了筷子" % current_thread().name) lock1.acquire() print("%s抢到了盘子" % current_thread().name) print("%s开吃了!" % current_thread().name) lock1.release() print("%s放下盘子" % current_thread().name) lock2.release() print("%s放下筷子" % current_thread().name) t1 = Thread(target=eat1) t2 = Thread(target=eat2) t1.start() t2.start()
7、递归锁
产生死锁的原因有两种,一种是上面描述的多把锁被多个线程持有,这样就会产生死锁问题,另一个问题就是如果只有一把锁,但是一个人使用这把锁acquire了两次就会产生死锁问题
对于这种死锁问题,我们可以通过检查代码来进行避免,但是这个是锁的问题,在python中,设置了另一种锁,叫做递归锁,不会因为一个线程把一把锁进行两次acquire就会死锁
这种锁就是递归锁 ——> Rlock
from threading import RLock, Lock, Thread # l = Lock() # # l.acquire() # print("1") # l.acquire() # print("2") l = RLock() # l.acquire() # print("1") # l.acquire() # print("2") def task(): l.acquire() print("子run......") l.release() # 主线程锁了一次 l.acquire() l.acquire() l.release() l.release() t1 = Thread(target=task) t1.start()
在使用Rlock时与使用Lock完全相同,但是在遇到了上方的第二种死锁问题时就不会阻塞,但是,需要注意的是,执行了几次acquire就需要执行几次release
7、信号量
信号量的意思就是被锁定的代码可以被多个线程访问
其与Lock方法的不同是,Lock只能由一个线程进行访问
信号量:Semaphore
from threading import Semaphore, Thread import time s = Semaphore(5) def task(): s.acquire() print("子run") time.sleep(3) print("子over") s.release() for i in range(10): t = Thread(target=task) t.start()