今日内容:
1、进程间互相通信(IPC机制)
2、生产者消费者模型
3、线程理论
4、线程开启的两种方式
5、线程相关属性方法
6、守护线程
7、线程互斥锁
1、进程间相互通信(IPC机制)
主要是一个关于队列的知识:队列=管道+锁
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
maxsize:是指该队列中最大存放个数,省略则为无限大小
q = Queue(4)#生成了一个q队列,该队列最大存放的个数为4 q.put('XX')#往队列里存了第一个值 q.put('DSB') q.put('爱吃') q.put('香肠') print(q.get())#从队列里取出第一个值并且删除该值,队列遵行的是先进先出 print(q.get()) print(q.get()) print(q.get())
其中q.put('XX',block = True,timeout=3)block为True(默认值)时,并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列的空间有剩余,如果超时,则会抛出Queue.Full异常。如果blocked为False,但该队列已满,会立即抛出Queue.Full异常。
相同get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)
2、生产者和消费者模式
在并发编程中使用生产者和消费者模式能够解决大多数并发问题,该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
在线程世界里,生产者就是生产数据的线程,消费者就是处理数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者,为了解决这个问题于是引入了生产者和消费者模式。
生产者和消费者模式是通过一个容器来解决生产者和消费者的解耦和问题,生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列相当于一个缓冲区,平衡了生产者和消费者的处理能力。
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print(' 33[45m%s 吃 %s 33[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print(' 33[44m%s 生产了 %s 33[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() p1=Process(target=producer,args=(q,)) c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() print('主')
4、线程的介绍:
进程其实是一种资源单位,而线程是一种处理cpu代码的单位
线程其实就是处理代码的过程
线程VS进程
1、统一进程下多个线程共享该进程的资源
2、创建线程的开销远远小于创建进程的开销
3、线程的两种开启方式
方式一:利用threading模块创建线程
from threading import Thread import time def task (name): print('%s is running'%name) time.sleep(2) print('%s is done'%name) if __name__ == '__main__': p = Thread(target=task,args=('子线程',)) p.start() print('主')
方式二:利用自定义类通过继承threading下的Thread来创建线程
from threading import Thread import time class Mythread(Thread): def run(self): print('%s is running'%self.name) time.sleep(2) print('%s is done'%self.name) if __name__ == '__main__': p = Mythread() p.start() print('主')
5、线程相关属性方法
当某些同属于一个进程下的线程可以共享该进程的资源。
解释代码:
from threading import Thread import time,random n = 100 def task(): global n n=0 if __name__ == '__main__': p = Thread(target=task) p.start() p.join() print(n)
可以通过调用OS模块,并通过调用其模块下的.getpid()方法可以查看的其线程的pid号(线程的pid号和该线程所属的进程的pid号相同)
from threading import Thread import os def task(name): print('%s is running'%name,os.getpid()) if __name__ == '__main__': p = Thread(target=task,args=('子线程',)) p.start() p.join() print('主',os.getpid())
可以通过导入threading模块下的active_count,可以查看调用时正在执行的线程数量
from threading import Thread,active_count import os def task(name): print('%s is running'%name,os.getpid()) if __name__ == '__main__': p = Thread(target=task,args=('子线程',)) p.start() # time.sleep(1) print('主',active_count())
可以通过导入threading模块下的current_thread,查看该线程的一些属性
from threading import Thread,active_count,current_thread import time,random import os def task(): print('%s is running'%current_thread().name) if __name__ == '__main__': p = Thread(target=task) p.start() time.sleep(1) print('主',active_count())
6、守护线程
对于主线程来说,运行完毕值得是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就会被回收)。因为主线程的结束意味着进程的结束,进程整体资源都将会被回收,而进程必须保证非守护线程都运行完毕才能结束
from threading import Thread import time def task(name): print('老奴%s存活'%name) time.sleep(3) print('老奴%s正常死亡'%name) if __name__ == '__main__': p = Thread(target=task,args=('阿张',)) p.daemon = True p.start() time.sleep(4) print('主')
7、线程互斥锁
由于在同一进程下多个线程能共享此进程的资源,所以如果多个线程要对一个数据进行更改,又由于创建一个线程的时间是很少的大概就是创建一个进程的百分之一的时间,所以就很有可能导致两个或多个线程在同一时间内操作一个数据,这样就会导致数据不安全,所以我们需要将更改数据的这一串代码加锁,加锁的作用就是让原本并发的线程在运行这一段加锁的代码是变成串行,这样就保证了数据的安全性。
from threading import Thread,Lock import time mutex = Lock() n=100 def task(): global n mutex.acquire() time.sleep(0.1) n-=1 mutex.release() if __name__ == '__main__': p_l =[] for i in range(100): p = Thread(target=task) p_l.append(p) p.start() for i in p_l: i.join() print(n) print('主')