1.多进程(multiprocessing)- 很大程度上是为了解决GIL问题
import os,time from multiprocessing import Process def info(title): print('title:',title) print('parent process:',os.getppid()) print('process id:',os.getpid()) #获取进程ID def f(name): info('funtion f') print('hello',name) if __name__ == '__main__': info('main process line') time.sleep(1) p = Process(target=info,args=('chris',)) #此时他为子进程 ppid为 上方父进程的pid p.start() p.join()
设置成守护进程的方法:p.daemon=True
其他方法:
(1)is_alive():返回进程是否在进行
(2)join([timeout]):阻塞当前上下文环境的进程,直到调用此方法的进程终止或到达指定的timeout(可选参数)
(3)terminate():不管任务是否完成,立即停止工作进程
属性:
(1)daemon:和线程的setDaemon功能一样
(2)name:进程名字
(3)pid:进程号
from multiprocessing import Process import time class Myprocess(Process): def __init__(self,num): super(Myprocess,self).__init__() self.num = num def run(self): time.sleep(1) print(self.is_alive()) print(self.num,self.pid) time.sleep(5) if __name__ == '__main__': l = [] for i in range(10): p = Myprocess(i) l.append(p) for p in l: p.start() print('the ending ')
2.进程间通信
在线程里,同一个进程里的所有线程共享这个进程里内存空间所定义的全局变量
(1)队列通信
# # # from multiprocessing import Process # import time # # class Myprocess(Process): # def __init__(self,num): # super(Myprocess,self).__init__() # self.num = num # def run(self): # time.sleep(1) # print(self.is_alive()) # print(self.num,self.pid) # time.sleep(5) # # if __name__ == '__main__': # l = [] # for i in range(10): # p = Myprocess(i) # l.append(p) # for p in l: # p.start() # print('the ending ') import queue,time import multiprocessing def foo(q): time.sleep(1) print('son process',id(q)) q.put(123) q.put('chris') if __name__ == '__main__': # q = queue.Queue() #这是线程队列 q = multiprocessing.Queue() #创建一个进程队列 print('main process',id(q)) p = multiprocessing.Process(target=foo,args=(q,)) #将q队列传过去,其实这个q是复制而来的 p.start() p.join() print(q.get(block=False)) #如果队列为空,这直接报错,而不进行同步 print(q.get(block=False))
(2)管道通信
from multiprocessing import Process,Pipe import time def f(conn): conn.send([12,{'name':'chris'},66]) response = conn.recv() print('收到回复:',response) conn.close() print('子进程ID:',id(conn)) if __name__ == '__main__': parent_conn,chil_conn = Pipe() #生成了管道的两个头 可以分配给主进程和子进程 双向管道 可以在管道内收发消息 print('ID-1',id(chil_conn)) p = Process(target=f,args=(chil_conn,)) p.daemon = True p.start() time.sleep(5) print(parent_conn.recv()) #收到子进程发来的消息 parent_conn.send('儿子你好!') p.join()
(3)Mangers
Queue&Pipe只是实现了数据交互(本质是复制,不能修改),并没实现数据共享,既一个进程去更改另一个进程的数据
能数据共享的类型:列表、字典、变量、互斥锁、递归锁、信号量、队列
from multiprocessing import Process,Manager def f(d,l,i): d[i] = 'chris' # 第一次 d{'name-0':'chris'} l.append(i) # [0,1,2,3,4,0] print('son id',id(d),id(l)) if __name__ == '__main__': with Manager() as manger: d = manger.dict() #进程里创建字典是调用manager内部封装下的dict l = manger.list(range(5)) print('main process:',id(d),id(l)) p_list = [] for i in range(10): p = Process(target=f,args=(d,l,i)) p.start() p_list.append(p) for a in p_list: a.join() print('the end ') print(d) #查看子进程是否修改了数据 print(l)
3.进程同步
虽说进程间的资源不共享,但有时候有些资源需要进程间进行共享(比如共用一块屏幕时,如果不加锁,多个进程之间会互相抢资源,发生串行)
注意!!进程之间一定要传送值,因为进程之间不共享数据!!!所以在主进程创建的变量,子进程要使用的话需要将变量传过去!!!
#进程同步 from multiprocessing import Process,Lock def f(l,i): with l: #相当于 l.acquire() print('hello world %s' %i) if __name__ == '__main__': l = Lock() for i in range(10): Process(target=f,args=(l,i)).start()
4.进程池
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池中没有可供使用的进程,那么程序就会等待,直到进程池中有可用的进程为止
进程池中有两个方法:
apply-同步方法 和 apply_async-异步方法
apply_async三个参数,func,args,和回调函数callback - 就是某个动作或者某个函数执行成功后再去执行的函数 回调函数是在主进程下调用!! 好处:日志记录时,让子进程能够顺利结束,日志记录的任务属于逻辑之外的任务,交给主进程去做就好了 接受到的参数来自子进程的return返回值
#进程池 from multiprocessing import Process,Pool import time,os def foo(i): time.sleep(1) print(i) print('son id', os.getpid()) return i+100 def bar(arg): #接受到的参数来自子进程的return返回值 i+100 print(os.getpid()) print('logger:',arg) if __name__ == '__main__': pool = Pool(5) #创建进程池对象,默认数量为电脑的CPU数量,此时最大进程数为5,4个并行,一个与其中一个CPU并发切换 bar(1) print('----'*5) for i in range(6): # pool.apply(func=foo,args=(i,)) #同步的 pool.apply_async(func=foo,args=(i,),callback=bar) #异步的 回调函数是在主进程下进行的!!而不是在子进程 pool.close() #close 要放在join前面 和join两个缺一不可 pool.join()
5.协程(非抢占式) - 协作式 用户态的切换
主要解决的也是IO操作
又叫微线程 本质:就是一个线程
优势:
1.没有切换的消耗
2.没有锁的概念
有一个问题:能用多核吗?可以采用多进程+协程,一个很好的解决方案,既在多个进程里跑协程
与生成器的交互相似,yield交互的两个方法 (1)next()或__next__() (2)send 将值送到指针停留出的yield位置 再往下找下一个yield
def f(): print('ok') res = yield 1 print(res) print('hello') yield f = f() #得到生成器对象 print(next(f)) # print(f.__next__()) # print(f.__next__()) f.send(5)
本节的主题是基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发,为此我们需要先回顾下并发的本质:切换+保存状态
cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞,另外一种情况是该任务计算的时间过长或有一个优先级更高的程序替代了它
ps:在介绍进程理论时,提及进程的三种执行状态,而线程才是执行单位,所以也可以将上图理解为线程的三种状态
一:其中第二种情况并不能提升效率,只是为了让cpu能够雨露均沾,实现看起来所有任务都被“同时”执行的效果,如果多个任务都是纯计算的,这种切换反而会降低效率。为此我们可以基于yield来验证。yield本身就是一种在单线程下可以保存任务运行状态的方法,我们来简单复习一下:
#1 yiled可以保存状态,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级
#2 send可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换
协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。、
需要强调的是:
#1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
#2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
对比操作系统控制线程的切换,用户在单线程内控制协程的切换
优点如下:
#1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
#2. 单线程内就可以实现并发的效果,最大限度地利用cpu
缺点如下:
#1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
#2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
总结协程特点:
- 必须在只有一个单线程里实现并发
- 修改共享数据不需加锁
- 用户程序里自己保存多个控制流的上下文栈
- 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))
生成器协程:
import time,queue def consumer(name): print('----> [%s]ready to eat baozi...' %name) while True: new_baozi = yield print('[%s] is eating baozi %s' %(name,new_baozi)) def producer(): r = con.__next__() r = con2.__next__() n = 0 while 1: time.sleep(1) print('