操作系统进程切换的两个条件:1.出现IO操作(在等待的时间内把cpu给其他进程用);
2.固定时间(特别短的时间内切换实现高并发)
上面的这两条都是实现时间上的多路复用,空间上的多路复用是说所有进程一起进入内存,比进程独占内存排队一个一个进更省时间
进程和线程 为什么要有进程? 就是为了实现并发,为了让多个程序同时跑起来 这个时候就涉及到CPU的切换了,整个过程的调度 由操作系统实现,切换就是状态的保存,进程有独立 空间,开销比较大。 为什么要有线程? 共用一套资源还开多进程太费劲了,所以出现了线程解决 这种解决共用一套数据资源的多进程问题 进程是最小的资源管理单位(进程是存放线程的容器) 线程是最小的执行单位,CPU执行的就是线程中的内容 并发和并行 串行:一个一个来 并行:同一时刻三个一起执行,一定是有多核cpu 并发:并发就是一个cpu(单核)去搞,同一时间只能有一个线程在执行, 但是会在极短的时间内迅速切换到其他进程,看起来是同时进行的 同步异步 同步等 异步不等
异步非阻塞,异步是不等待,非阻塞是回调 CPython中因为有GIL锁导致同一时刻同一进程只能有一个线程去执行 线程的建立和管理 2种线程的调用 start()启动 join()和setDaemon(守护的线程正常情况下会跟着主线程一起结束) 主线程一结束,进程就结束 setDaemon:程序直到不存在非守护线程时退出 死锁和递归锁 threading.Lock(),threading下的同步锁 threading.Rlock(),threading下的递归锁 同步锁:因为多个线程在处理公共数据 2个同步锁处理问题时若中间有IO操作会出现死锁的问题,于是就有了递归锁 递归锁内部有计数器,acquire一次+1,release一次- event对象(threading.Event(),多线程下的开关event对象) 如果是锁一定能进行进行acquire和release方法,所以event不是锁 event只是一个对象,它能让多个线程之间进行状态的判断,多线程都能识别event对象 应对两个线程进行通信的场合,线程1被wait住后,线程2达到预定状态后会set一下,让线程1继续走 .wait()方法是判断标识位,如果为False就卡主,如果是True就继续运行 .set()方法是把标识位变为True 多线程中那个线程器起引导作用的哪个线程下面就设置event.set方法,一般一个 其他被支配的线程下面设置event.wait方法,一般多个(其实也不能说引导和支配,都是给自己理解) 注意注意:所有的event在这里都是参数,他真正指向的是threading.Event()的实例 semaphore信号量,也是threading.Semaphore(),也是在threading下的 semaphore也是一把锁,是锁都有acquire和release方法 这个锁的特点是能acquire的线程是有多个的,而且量由我们自己控制 semaphore=threading.Semaphore(5)#5就是上面的量,同时可以由5个线程拿到锁 应用在链接池,一个池子里一批 总结: 所有的锁都是跟线程有关的(虽然GIL是装在解释器上的但是也是被线程竞争的) 上面的这些东东都是Python多线程threading模块下的类的对象(锁和开关)
进程
考虑一个场景:浏览器,网易云音乐以及notepad++ 三个软件只能顺序执行是怎样一种场景呢?另外,假如有两个程序A和B,程序A在执行到一半的过程中,需要读取大量的数据输入(I/O操作),而此时CPU只能静静地等待任务A读取完数据才能继续执行,这样就白白浪费了CPU资源。你是不是已经想到在程序A读取数据的过程中,让程序B去执行,当程序A读取完数据之后,让程序B暂停。聪明,这当然没问题,但这里有一个关键词:切换。
既然是切换,那么这就涉及到了状态的保存,状态的恢复,加上程序A与程序B所需要的系统资源(内存,硬盘,键盘等等)是不一样的。自然而然的就需要有一个东西去记录程序A和程序B分别需要什么资源,怎样去识别程序A和程序B等等(比如读书)。
进程定义:
进程就是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程需要完成哪些功能以及如何完成;数据集则是程序执行过程中所需要使用的资源;进程数据块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。
举例说明进程:想象一位有一手好厨艺的计算机科学家正在为他的女儿烘制生日蛋糕。他有做生日蛋糕的食谱,厨房里有所需的原料:面粉、鸡蛋、糖、香草汁等。在这个比喻中,做蛋糕的食谱就是程序(即用适当形式描述的算法)计算机科学家就是处理器(cpu),而做蛋糕的各种原料就是输入数据。进程就是厨师阅读食谱、取来各种原料以及烘制蛋糕等一系列动作的总和。现在假设计算机科学家的儿子哭着跑了进来,说他的头被一只蜜蜂蛰了。计算机科学家就记录下他照着食谱做到哪儿了(保存进程的当前状态),然后拿出一本急救手册,按照其中的指示处理蛰伤。这里,我们看到处理机从一个进程(做蛋糕)切换到另一个高优先级的进程(实施医疗救治),每个进程拥有各自的程序(食谱和急救手册)。当蜜蜂蛰伤处理完之后,这位计算机科学家又回来做蛋糕,从他离开时的那一步继续做下去。
线程
线程的出现是为了降低上下文切换的消耗,提高系统的并发性,并突破一个进程只能干一样事的缺陷,使到进程内并发称为可能。
假设,一个文本程序,需要接受键盘输入,将内容显示在屏幕上,还需要保存信息到硬盘中。若只有一个进程,势必造成同一时间只能干一样事的尴尬(当保存时,就不能通过键盘输入内容)。若有多个进程,每个进程负责一个任务,进程A负责接收键盘输入的任务,进程B负责将内容显示在屏幕上的任务,进程C负责保存内容到硬盘中的任务。这里进程A,B,C间的协作涉及到了进程通信问题,而且有共同都需要拥有的东西——文本内容,不停的切换造成性能上的损失。若有一种机制,可以是任务A,B,C共享资源,这样上下文切换所需要保存和恢复的内容就少了,同时又可以减少通信所带来的性能损耗,那就好了。是的,这种机制就是线程。
线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由现场呢ID、程序计数器、寄存器集合和堆栈共同组成。线程的引入减小了程序并发执行时的开销,提高了操作系统的并发性能。线程没有自己的系统资源。
进程与线程的关系
进程是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构上的基础。或者说进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位。
线程则是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。
进程和线程的关系:
(1)一个线程只能属于一个进程,而一个进程可以有多个线程,但至少有一个线程。
(2)资源分配给进程,同一进程的所有线程共享改进程的所有资源。
(3)CPU分给线程,即真正在CPU上运行的是线程。
并行和并发
并行处理(Parallel Processing)是计算机系统中能同时执行两个或更多个处理的一种计算方法。并行处理可同时工作于同一程序的不同方面。并行处理可同时工作于同一程序的不同方面。并行处理的主要目的是节省大型和复杂问题的解决时间。并发处理(concurrency Processing):指一个时间段中有几个程序都处于已启动运行到运行到运行完毕之间,且这几个程序都是在同一个处理机(CPU)上运行,但任一个时刻点上只有一个程序在处理机(CPU)上运行。
并发的关键是你有处理多个任务的能力,不一定要同时。并行的关键是你有同时处理多个任务的能力。所以说,并行是并发的子集。
同步和异步
在计算机领域,同步就是只一个进程在执行某个请求的时候,若该请求需要一段时间才能返回值,那么这个进程将会一直等下去,直到收到消息返回信息才继续执行下去;异步是指进程不需要一直等下去,而是继续执行下面的操作,不管其他进程的状态。当有消息返回时系统会通知进程处理,这样可以提高执行的效率。举个例子,打电话是就是同步通信,发短信是就是异步通信。
threading模块
线程对象的创建
Thread类直接创建
#直接通过多线程下的类实例出线程对象 #这个进程里面有三个线程,1个主线程,t1,t2两个子线程 #子线成和主线程是同步开启的,但主线程结束后,进程还没有关闭,子线程全部结束后进程才会关闭
import threading import time def countNum(n): # 定义某个线程要运行的函数 print("running on number:%s" %n) time.sleep(3) if __name__ == '__main__': t1 = threading.Thread(target=countNum,args=(23,)) #生成一个线程实例 t2 = threading.Thread(target=countNum,args=(34,)) t1.start() #启动线程 t2.start() print("ending!")
Thread类继承式创建
#继承Thread式创建 #可能是因为自己没学到家吧,感觉这个不实用,注定会走run函数,可问题是run函数的函数体是一样的啊 import threading import time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num=num def run(self): #这个方法必须有,是start()方法里面触发的,没有run就会报错,和socket里面的handle()一样 print("running on number:%s" %self.num) time.sleep(3) t1=MyThread(56) t2=MyThread(78) t1.start() t2.start() print("ending")
Thread类的实例的方法
join()和setDaemon()
# join():在子线程完成运行之前,这个子线程的父线程将一直被阻塞。也就是说,子线程出现join后会组织主线程的运行 # setDaemon(True): ''' 将线程声明为守护线程,必须在start() 方法调用之前设置,如果不设置为守护线程程序会被无限挂起。 当我们在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成 想退出时,会检验子线程是否完成。如果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是只要主线程 完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法啦''' #被设定守护的子线程理论上会和主线程一起运行结束,但是如果有多个子线程,而且不是全都守护线程的话,而且
#主线程比子线程先结束的话主线程会等子线程执行完才会关闭进程,而且这个期间如果被守护的子线程能够执行完
#的话,他也会执行完 import threading from time import ctime,sleep import time def Music(name): print ("Begin listening to {name}. {time}".format(name=name,time=ctime())) sleep(3) print("end listening {time}".format(time=ctime())) def Blog(title): print ("Begin recording the {title}. {time}".format(title=title,time=ctime())) sleep(5) print('end recording {time}'.format(time=ctime())) threads = [] t1 = threading.Thread(target=Music,args=('FILL ME',))#元组形式的参数一定要加","不然会报错 t2 = threading.Thread(target=Blog,args=('python',)) threads.append(t1) threads.append(t2) if __name__ == '__main__': #t2.setDaemon(True) for t in threads: #t.setDaemon(True) #注意:一定在start之前设置 t.start() #t.join() #t1.join() #t2.join() # 考虑这三种join位置下的结果? print ("all over %s" %ctime())
其他方法
Thread实例对象的方法 # isAlive(): 返回线程是否活动的。 # getName(): 返回线程名。 # setName(): 设置线程名。 threading模块提供的一些方法: # threading.currentThread(): 返回当前的线程变量。 # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
GIL(全局解释器锁)
''' 定义: In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.) '''
Python中的线程是操作系统的原生线程,Python虚拟机使用一个全局解释器锁(Global Interpreter Lock)来互斥线程对Python虚拟机的使用。为了支持多线程机制,一个基本的要求就是需要实现不同线程对共享资源访问的互斥,所以引入了GIL。
GIL:在一个线程拥有了解释器的访问权之后,其他的所有线程都必须等待它释放解释器的访问权,即使这些线程的下一条指令并不会互相影响。
在调用任何Python C API之前,要先获得GIL
GIL缺点:多处理器退化为单处理器;优点:避免大量的加锁解锁操作
GIL的早期设计
Python支持多线程,而解决多线程之间数据完整性和状态同步的最简单方法自然就是加锁。 于是有了GIL这把超级大锁,而当越来越多的代码库开发者接受了这种设定后,他们开始大量依赖这种特性(即默认python内部对象是thread-safe的,无需在实现时考虑额外的内存锁和同步操作)。慢慢的这种实现方式被发现是蛋疼且低效的。但当大家试图去拆分和去除GIL的时候,发现大量库代码开发者已经重度依赖GIL而非常难以去除了。有多难?做个类比,像MySQL这样的“小项目”为了把Buffer Pool Mutex这把大锁拆分成各个小锁也花了从5.5到5.6再到5.7多个大版为期近5年的时间,并且仍在继续。MySQL这个背后有公司支持且有固定开发团队的产品走的如此艰难,那又更何况Python这样核心开发和代码贡献者高度社区化的团队呢?
GIL的影响
无论你启多少个线程,你有多少个cpu, Python在执行一个进程的时候会淡定的在同一时刻只允许一个线程运行。
所以,python是无法利用多核CPU实现多线程的。
这样,python对于计算密集型的任务开多线程的效率甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。
#coding:utf8 from threading import Thread import time def counter(): i = 0 for _ in range(50000000): i = i + 1 return True def main(): l=[] start_time = time.time() for i in range(2): t = Thread(target=counter) t.start() l.append(t) t.join() # for t in l: # t.join() end_time = time.time() print("Total time: {}".format(end_time - start_time)) if __name__ == '__main__': main() ''' py2.7: 串行:25.4523348808s 并发:31.4084379673s py3.5: 串行:8.62115597724914s 并发:8.99609899520874s '''
解决方案
用multiprocessing替代Thread multiprocessing库的出现很大程度上是为了弥补thread库因为GIL而低效的缺陷。它完整的复制了一套thread所提供的接口方便迁移。唯一的不同就是它使用了多进程而不是多线程。每个进程有自己的独立的GIL,因此也不会出现进程之间的GIL争抢。
#coding:utf8 from multiprocessing import Process import time def counter(): i = 0 for _ in range(40000000): i = i + 1 return True def main(): l=[] start_time = time.time() for _ in range(2): t=Process(target=counter) t.start() l.append(t) #t.join() for t in l: t.join() end_time = time.time() print("Total time: {}".format(end_time - start_time)) if __name__ == '__main__': main() ''' py2.7: 串行:6.1565990448 s 并行:3.1639978885 s py3.5: 串行:6.556925058364868 s 并发:3.5378448963165283 s '''
当然multiprocessing也不是万能良药。它的引入会增加程序实现时线程间数据通讯和同步的困难。就拿计数器来举例子,如果我们要多个线程累加同一个变量,对于thread来说,申明一个global变量,用thread.Lock的context包裹住三行就搞定了。而multiprocessing由于进程之间无法看到对方的数据,只能通过在主线程申明一个Queue,put再get或者用share memory的方法。这个额外的实现成本使得本来就非常痛苦的多线程程序编码,变得更加痛苦了。
总结:因为GIL的存在,只有IO Bound场景下得多线程会得到较好的性能 - 如果对并行计算性能较高的程序可以考虑把核心部分也成C模块,或者索性用其他语言实现 - GIL在较长一段时间内将会继续存在,但是会不断对其进行改进。所以对于GIL,既然不能反抗,那就学会去享受它吧!
同步锁
import time import threading def addNum(): global num # num -= 1 print("ok") #会立马打印出100个ok,然后在进行下面的步骤,会一直切换到最后然后再给第一个醒的 # lockl.acquire() #获取锁 temp=num #这种情况的结果是99,因为从第一个线程开始保存到内存当中的都是100 time.sleep(0.1) #就这样一直到最后一个线程都是100,最后所有的线程都有自己的状态保存100 num = temp-1 #只能等着收100-1的值也就是99 # lockl.release() #释放锁 num=100 #设定一个共享变量 threads=[] lockl=threading.Lock() #拿到锁对象 for i in range(100): t=threading.Thread(target=addNum) t.start() threads.append(t) for t in threads: #等待所有线程执行完毕 t.join() print("Result: ",num)
锁通常被用来实现对共享资源的同步访问。为每一个共享资源创建一个Lock对象,当你需要访问该资源时,调用acquire方法来获取锁对象(如果其它线程已经获得了改锁,则当前线程需等待其被释放),待资源访问完后,再调用release方法释放锁:
import threading R=threading.Lock() #这个锁主要是对公共数据用的,其它没用的数据不用加 R.acquire() ''' 对公共数据的操作 ''' R.release()
扩展思考
''' 1.为什么有了GIL,还需要线程同步? 多线程环境下必须存在资源的竞争,那么如何才能保证同一时刻只有一个线程对共享资源进行存取? 加锁,可以保证存取操作的唯一性,从而保证同一时刻只有一个线程对共享数据存取。 通常加锁也有2种不同的粒度的锁: coarse-grained(粗粒度):Python解释器层面维护着一个全局的锁机制,用来保证线程安全。 内核级通过GIL实现的互斥保护了内核的共享资源。 fine-grained(细粒度):那么程序员需要自行地加,解锁来保证线程安全, 用户级通过自行加锁保护的用户程序的共享资源。 2.GIL为什么限定在一个进程上? 你写一个py程序,运行起来本身就是一个进程,这个进程是有解释器来翻译的,所以GIL限定在当前进程; 如果又创建一个子进程,那么两个进程是完全独立的,这个子进程也是有Python解释器来运行的 所以这个子进程也是受GIL影响的 '''
死锁与递归锁
所谓死锁:是指两个或两个以上的进程或线程在执行的过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。
import threading import time mutexA=threading.Lock() #同步锁 mutexB=threading.Lock() class MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): self.fun1() self.fun2() def fun1(self): mutexA.acquire() #如果锁被占用,则阻塞在这里,等待锁的释放 print("I am %s,get res:%s---%s" %(self.name,"ResA",time.time())) mutexB.acquire() print("I am %s,get res: %s---%s" %(self.name,"ResB",time.time())) mutexB.release() mutexA.release() def fun2(self): mutexB.acquire() print("I am %s,get res: %s---%s" %(self.name,"ResB",time.time())) time.sleep(0.2) mutexA.acquire() print("I am %s,get res: %s---%s" %(self.name,"ResA",time.time())) mutexA.release() mutexB.release() if __name__ == '__main__': print("start------------%s"%time.time()) for i in range(10): my_thread=MyThread() my_thread.start()
在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
mutex=threading.RLock()
Event对象
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就 会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行
event.isSet() :返回event的转态值; event.wait():如果 event.isSet()==False将阻塞线程; event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度; event.clear():恢复event的状态值为False。
可以考虑一种应用场景(仅仅作为说明),例如,我们有多个线程Redis队列中读取数据来处理,这些线程都要尝试去连接Redis的服务,一般情况下,如果Redis连接不成功,在各个线程的代码中,都会去尝试重新连接。如果我们想要在启动时确保Redis服务正常,才让那些工作线程去连接Redis服务器,那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作:主线程中会去尝试连接Redis服务,如果正常的话,触发事件,各工作线程会尝试连接Redis服务。
import threading import time import logging logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',) def worker(event): logging.debug('Waiting for redis ready...') event.wait() logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime()) time.sleep(1) def main(): readis_ready = threading.Event() t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1') t1.start() t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2') t2.start() logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event') time.sleep(3) # simulate the check progress readis_ready.set() if __name__=="__main__": main()
threading.Event的wait方法还接受一个超时参数,默认情况下如果事件一致没有发生,wait方法会一直阻塞下去,而加入这个超时参数之后,如果阻塞时间超过这个参数设定的值之后,wait方法会返回。对应于上面的应用场景,如果Redis服务器一致没有启动,我们希望子线程能够打印一些日志来不断地提醒我们当前没有一个可以连接的Redis服务,我们就可以通过设置这个超时参数来达成这样的目的:
def worker(event): while not event.is_set(): logging.debug('Waiting for redis ready...') event.wait(2) logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime()) time.sleep(1)
这样,我们就可以在等待Redis服务启动的同时,看到工作线程里正在等待的情况。
Semaphore(信号量) 可以控制同时获取锁的量
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):
import threading import time semaphore = threading.Semaphore(5) def func(): if semaphore.acquire(): print (threading.currentThread().getName() + ' get semaphore') time.sleep(2) semaphore.release() for i in range(20): t1 = threading.Thread(target=func) t1.start()
应用:连接池
与Rlock的区别:Rlock是只能有同一个线程去获取连接,而Semaphore可以是好多个
队列(queue)
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
队列在线程编程中特别有用,因为信息必须在多个线程之间安全地交换。
之前学的数据类型都是线程不安全的,所以我们有了用户锁,需要保护数据的时候自己加锁,但是锁加多长时间,什么时候释放又是问题,比较麻烦,然后就有了队列这个数据类型或者说是数据结构,数据类型或数据结构就是用来存放数据的。
队列先进先出,吃了拉 FIFO,first in first out
优点:线程安全,内部已经加好锁了,学队列就是不想用锁,锁码烦,可以保证同一是可只能有一个线程去队列里面拿东西
get与put方法
#get与put方法 ''' 创建一个“队列”对象 import Queue q = Queue.Queue(maxsize = 10) Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数 maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。 将一个值放入队列中 q.put(10) 调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值; 第二个block为可选参数,默认为 1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0, put方法将引发Full异常。 将一个值从队列中取出 q.get() 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且 block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。 '''
join与task_done方法
''' join() 阻塞进程,直到所有任务完成,需要配合另一个方法task_done。 def join(self): with self.all_tasks_done: while self.unfinished_tasks: self.all_tasks_done.wait() task_done() 表示某个任务完成。每一条get语句后需要一条task_done。 import queue q = queue.Queue(5) q.put(10) q.put(20) print(q.get()) q.task_done() print(q.get()) q.task_done() q.join() print("ending!") '''
其他常用方法
''' 此包中的常用方法(q = Queue.Queue()): q.qsize() 返回队列的大小 q.empty() 如果队列为空,返回True,反之False q.full() 如果队列满了,返回True,反之False q.full 与 maxsize 大小对应 q.get([block[, timeout]]) 获取队列,timeout等待时间 q.get_nowait() 相当q.get(False)非阻塞 q.put(item) 写入队列,timeout等待时间 q.put_nowait(item) 相当q.put(item, False) q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号 q.join() 实际上意味着等到队列为空,再执行别的操作 '''
其他模式
''' Python Queue模块有三种队列及构造函数: 1、Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize) 2、LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize) 3、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize) import queue #先进后出 q=queue.LifoQueue() q.put(34) q.put(56) q.put(12) #优先级 q=queue.PriorityQueue() q.put([5,100]) q.put([7,200]) q.put([3,"hello"]) q.put([4,{"name":"alex"}]) while 1: data=q.get() print(data) '''
生产者消费者模型
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这就像,在餐厅,厨师做好菜,不需要直接和客户交流,而是交给前台,而客户去饭菜也不需要不找厨师,直接去前台领取即可,这也是一个结耦的过程。
# import queue # q=queue.Queue(3) #first in first out (FIFO) # Queue类的__init__()中有一个参数maxsize,设置最大put量 #这里设置为3 # q.put(111) # q.put("hello") # q.put(2220) # q.put(222,False) #raise Full # q.put(222) #这是第四次put但是maxsize为3,这次不成功底下都不运行,1个也取不出来 # print(q.get()) # print(q.get()) # print(q.get()) # print(q.get(False)) #raise Empty #--------------------------------------------------------------------------- # import queue # q=queue.Queue() # q.put(111) # q.put(222) # q.task_done() # q.task_done() #如果只有put没有get,put几次就需要几个task_done # q.join() #阻塞进程知道所有任务完成,配合task_done操作 # print("ending") # import queue # q=queue.Queue() # q.put(111) # q.put(222) # #如果只有put没有get,put几次就需要几个task_done # print(q.get()) # # print(q.get()) #如果get和put都出现,task_done # q.task_done() # q.task_done() # # q.get() # q.join() #阻塞进程知道所有任务完成,配合task_done操作 # print("ending") #---------------------------------------------------------------------------- #先进后出 # import queue # q=queue.LifoQueue(5) #last in first out (LIFO) # q.put(111) # q.put(222) # q.put(333) # print(q.get()) #------------------------------------------------------------------------- #按优先级来 # import queue # q=queue.PriorityQueue() #priority 优先权 # q.put([1,"hello"]) #列表形式,第一个元素是定优先级的,第二个元素是你插入的数据 # q.put([0,"hello"]) # q.put([-1,"hello"]) # print(q.get()) # print(q.get()) # print(q.get()) #------------------------------------------------------------------------- #生产者消费者而模型,和线程进程、队列是没有关系的,这是一种设计模式 #我们可以配合队列来模拟这种模型 #这个模型是通过一个容器来解决生产者和消费者的强耦合问题的 #在线程的世界里,生产者就是生产数据的线程,消费者就是消费数据的线程 #A-->B-->C C-->B-->A B就相当于一个缓冲区,平衡生成者和消费者的处理能力 # import queue # import time,threading,random # q=queue.Queue() # def producer(name): #生产数据的函数 # count=0 # while count<10: # print("making....") # time.sleep(2) # # time.sleep(random.randrange(3)) # q.put(count) # print("producer %s has produced %s baozi..." %(name,count)) # count +=1 # q.task_done() # q.join() # print("ok.....") # # def consumer(name): # count=0 # while count<10: # time.sleep(1) # if not q.empty(): #如果队列里面不为空 # data=q.get() # # q.task_done() # # q.join() # print(data) # print(" 33[32;1mConsumer %s has eat %s baozi...33[0m" %(name,data)) # else: # print("-------no baozi anymore-------") # count += 1 # p1=threading.Thread(target=producer,args=("Atai",))#args的参数后面是元组形式,而且一定要加,号哦 # c1=threading.Thread(target=consumer,args=("egon",)) # # c2=threading.Thread(target=consumer,args=("alex",)) # # c3=threading.Thread(target=consumer,args=("yuan",)) # p1.start() # c1.start()
multiprocessing模块
Multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.
由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。
multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。
Python进程调用
# Process类调用 from multiprocessing import Process import time def f(name): print('hello', name,time.ctime()) time.sleep(1) if __name__ == '__main__': p_list=[] for i in range(3): p = Process(target=f, args=('alvin:%s'%i,)) p_list.append(p) p.start() for i in p_list: p.join() print('end') # 继承Process类调用 from multiprocessing import Process import time class MyProcess(Process): def __init__(self): super(MyProcess, self).__init__() # self.name = name def run(self): print ('hello', self.name,time.ctime()) time.sleep(1) if __name__ == '__main__': p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print('end')
process类
构造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 线程组,目前还没有实现,库引用中提示必须是None;
target: 要执行的方法;
name: 进程名;
args/kwargs: 要传入方法的参数。
实例方法:
is_alive():返回进程是否在运行。
join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
start():进程准备就绪,等待CPU调度
run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
terminate():不管任务是否完成,立即停止工作进程
属性:
daemon:和线程的setDeamon功能一样
name:进程名字。
pid:进程号。
from multiprocessing import Process import os import time def info(name): print("name:",name) print('parent process:', os.getppid()) print('process id:', os.getpid()) print("------------------") time.sleep(1) def foo(name): info(name) if __name__ == '__main__': info('main process line') p1 = Process(target=info, args=('alvin',)) p2 = Process(target=foo, args=('egon',)) p1.start() p2.start() p1.join() p2.join() print("ending")
协程 程序员自己切换,之前的是操作系统切换
协程的在单线程实现并发的时候,不用锁不用锁不用锁,而且同一时刻永远是单线程,正好对应Python的傻逼GIL锁。协程最大应用还是在IO操作上
单线程,不存在操作系统切换,又程序员自己切换;
多线程对公共数据进行处理的时候为了数据安全有了锁,协程是单线程不存在任何锁的概念;
但是光有yield实现不了必须得加上底层的监听,这需要用C语言解决
yield与协程
import time """ 传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。 如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。 """ # 注意到consumer函数是一个generator(生成器): # 任何包含yield关键字的函数都会自动成为生成器(generator)对象 def consumer(): r = '' while True: # 3、consumer通过yield拿到消息,处理,又通过yield把结果传回; # yield指令具有return关键字的作用。然后函数的堆栈会自动冻结(freeze)在这一行。 # 当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时, # 就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。通过这种方式,迭代器可以实现无限序列和惰性求值。 n = yield r if not n: return print('[CONSUMER] ←← Consuming %s...' % n) time.sleep(1) r = '200 OK' def produce(c): # 1、首先调用c.next()启动生成器 next(c) n = 0 while n < 5: n = n + 1 print('[PRODUCER] →→ Producing %s...' % n) # 2、然后,一旦生产了东西,通过c.send(n)切换到consumer执行; cr = c.send(n) # 4、produce拿到consumer处理的结果,继续生产下一条消息; print('[PRODUCER] Consumer return: %s' % cr) # 5、produce决定不生产了,通过c.close()关闭consumer,整个过程结束。 c.close() if __name__=='__main__': # 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。 c = consumer() produce(c) ''' result: [PRODUCER] →→ Producing 1... [CONSUMER] ←← Consuming 1... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 2... [CONSUMER] ←← Consuming 2... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 3... [CONSUMER] ←← Consuming 3... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 4... [CONSUMER] ←← Consuming 4... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 5... [CONSUMER] ←← Consuming 5... [PRODUCER] Consumer return: 200 OK '''
greenlet
greenlet机制的主要思想是:生成器函数或者协程函数中的yield语句挂起函数的执行,直到稍后使用next()或send()操作进行恢复为止。可以使用一个调度器循环在一组生成器函数之间协作多个任务。greenlet是python中实现我们所谓的"Coroutine(协程)"的一个基础库.
from greenlet import greenlet def test1(): print (12) gr2.switch() print (34) gr2.switch() def test2(): print (56) gr1.switch() print (78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
基于greenlet的框架
gevent模块实现协程 有监听IO操作的机制,C+Python实现
Python通过yield提供了对协程的基本支持,但是不完全。而第三方的gevent为Python提供了比较完善的协程支持。
gevent是第三方库,通过greenlet实现协程,其基本思想是:
当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成:
import gevent import time def foo(): print("running in foo") gevent.sleep(2) print("switch to foo again") def bar(): print("switch to bar") gevent.sleep(5) print("switch to bar again") start=time.time() gevent.joinall( [gevent.spawn(foo), gevent.spawn(bar)] ) print(time.time()-start)
当然,实际代码里,我们不会用gevent.sleep()去切换协程,而是在执行到IO操作时,gevent自动切换,代码如下:
from gevent import monkey monkey.patch_all() import gevent from urllib import request import time def f(url): print('GET: %s' % url) resp = request.urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url)) start=time.time() gevent.joinall([ gevent.spawn(f, 'https://itk.org/'), gevent.spawn(f, 'https://www.github.com/'), gevent.spawn(f, 'https://zhihu.com/'), ]) # f('https://itk.org/') # f('https://www.github.com/') # f('https://zhihu.com/') print(time.time()-start)
扩展:
gevent是一个基于协程(coroutine)的Python网络函数库,通过使用greenlet提供了一个在libev事件循环顶部的高级别并发API。
主要特性有以下几点:
<1> 基于libev的快速事件循环,Linux上面的是epoll机制
<2> 基于greenlet的轻量级执行单元
<3> API复用了Python标准库里的内容
<4> 支持SSL的协作式sockets
<5> 可通过线程池或c-ares实现DNS查询
<6> 通过monkey patching功能来使得第三方模块变成协作式
gevent.spawn()方法spawn一些jobs,然后通过gevent.joinall将jobs加入到微线程执行队列中等待其完成,设置超时为2秒。执行后的结果通过检查gevent.Greenlet.value值来收集。
1、关于Linux的epoll机制: epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的 增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。epoll的优点: (1)支持一个进程打开大数目的socket描述符。select的一个进程所打开的FD由FD_SETSIZE的设置来限定,而epoll没有这个限制,它所支持的FD上限是 最大可打开文件的数目,远大于2048。 (2)IO效率不随FD数目增加而线性下降:由于epoll只会对“活跃”的socket进行操作,于是,只有”活跃”的socket才会主动去调用 callback函数,其他 idle状态的socket则不会。 (3)使用mmap加速内核与用户空间的消息传递。epoll是通过内核于用户空间mmap同一块内存实现的。 (4)内核微调。 2、libev机制 提供了指定文件描述符事件发生时调用回调函数的机制。libev是一个事件循环器:向libev注册感兴趣的事件,比如socket可读事件,libev会对所注册的事件 的源进行管理,并在事件发生时触发相应的程序。
4.2.2 官方文档中的示例:
import gevent
from gevent import socket
urls = [‘www.google.com.hk’,’www.example.com’, ‘www.python.org’ ]
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=2)
[job.value for job in jobs]
[‘74.125.128.199’, ‘208.77.188.166’, ‘82.94.164.162’]
注解:gevent.spawn()方法spawn一些jobs,然后通过gevent.joinall将jobs加入到微线程执行队列中等待其完成,设置超时为2秒。执行后的结果通过检查gevent.Greenlet.value值来收集。gevent.socket.gethostbyname()函数与标准的socket.gethotbyname()有相同的接口,但它不会阻塞整个解释器,因此会使得其他的greenlets跟随着无阻的请求而执行。
4.2.3 Monket patching
Python的运行环境允许我们在运行时修改大部分的对象,包括模块、类甚至函数。虽然这样做会产生“隐式的副作用”,而且出现问题很难调试,但在需要修改Python本身的基础行为时,Monkey patching就派上用场了。Monkey patching能够使得gevent修改标准库里面大部分的阻塞式系统调用,包括socket,ssl,threading和select等模块,而变成协作式运行。
from gevent import monkey ;
monkey . patch_socket ()
import urllib2
通过monkey.patch_socket()方法,urllib2模块可以使用在多微线程环境,达到与gevent共同工作的目的。
4.2.4 事件循环
不像其他网络库,gevent和eventlet类似, 在一个greenlet中隐式开始事件循环。没有必须调用run()或dispatch()的反应器(reactor),在twisted中是有 reactor的。当gevent的API函数想阻塞时,它获得Hub实例(执行时间循环的greenlet),并切换过去。如果没有集线器实例则会动态 创建。
libev提供的事件循环默认使用系统最快轮询机制,设置LIBEV_FLAGS环境变量可指定轮询机制。LIBEV_FLAGS=1为select, LIBEV_FLAGS = 2为poll, LIBEV_FLAGS = 4为epoll,LIBEV_FLAGS = 8为kqueue。
Libev的API位于gevent.core下。注意libev API的回调在Hub的greenlet运行,因此使用同步greenlet的API。可以使用spawn()和Event.set()等异步API。
eventlet实现协程(了解)
eventlet 是基于 greenlet 实现的面向网络应用的并发处理框架,提供“线程”池、队列等与其他 Python 线程、进程模型非常相似的 api,并且提供了对 Python 发行版自带库及其他模块的超轻量并发适应性调整方法,比直接使用 greenlet 要方便得多。
其基本原理是调整 Python 的 socket 调用,当发生阻塞时则切换到其他 greenlet 执行,这样来保证资源的有效利用。需要注意的是:
eventlet 提供的函数只能对 Python 代码中的 socket 调用进行处理,而不能对模块的 C 语言部分的 socket 调用进行修改。对后者这类模块,仍然需要把调用模块的代码封装在 Python 标准线程调用中,之后利用 eventlet 提供的适配器实现 eventlet 与标准线程之间的协作。
虽然 eventlet 把 api 封装成了非常类似标准线程库的形式,但两者的实际并发执行流程仍然有明显区别。在没有出现 I/O 阻塞时,除非显式声明,否则当前正在执行的 eventlet 永远不会把 cpu 交给其他的 eventlet,而标准线程则是无论是否出现阻塞,总是由所有线程一起争夺运行资源。所有 eventlet 对 I/O 阻塞无关的大运算量耗时操作基本没有什么帮助。
IO模型
1 IO模型前戏准备 在进行解释之前,首先要说明几个概念: 用户空间和内核空间 进程切换 进程的阻塞 文件描述符 缓存 I/O 用户空间与内核空间 现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。 操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。 为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。 针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。 进程切换 为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换,这种切换是由操作系统来完成的。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。 从一个进程的运行转到另一个进程上运行,这个过程中经过下面这些变化: 保存处理机上下文,包括程序计数器和其他寄存器。 更新PCB信息。 把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。 选择另一个进程执行,并更新其PCB。 更新内存管理的数据结构。 恢复处理机上下文。 注:总而言之就是很耗资源的 进程的阻塞 正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的。 文件描述符fd 文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。 文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。 import socket print(socket.socket()) <socket.socket fd=172, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0> 缓存 I/O 缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。用户空间没法直接访问内核空间的,内核态到用户态的数据拷贝 缓存 I/O 的缺点: 数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。
通过检测IO里面是否有数据来实现并发的效果是通过IO多路复用来实现的,IO多路复用是IO模型的其中一种。
同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?这个问题其实不同的人给出的答案都可能不同,比如wiki,就认为asynchronous IO和non-blocking IO是一个东西。这其实是因为不同的人的知识背景不同,并且在讨论这个问题的时候上下文(context)也不相同。所以,为了更好的回答这个问题,先限定一下本文的上下文。
本文讨论的背景是Linux环境下的network IO。
上面那些IO都是基于本地的,没有和外部进行传输,不要混淆
每个电脑都有自己的虚拟空间,内核空间和用户空间
Stevens在文章中一共比较了五种IO Model:
- blocking IO (阻塞IO,input,accept,recv等待)
- nonblocking IO (非阻塞IO,在IO的基础上加个setbloking(False)就好了)
- IO multiplexing (IO多路复用,特点:监听多个链接)
- signal driven IO (驱动信号)
- asynchronous IO (异步IO)
由于signal driven IO在实际中并不常用,所以我这只提及剩下的四种IO Model。
再说一下IO发生时涉及的对象和步骤。
对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,它会经历两个阶段:
- 等待数据准备 (Waiting for the data to be ready)
- 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)
记住这两点很重要,因为这些IO Model的区别就是在两个阶段上各有不同的情况。
blockingIO(阻塞IO)
在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
所以,blocking IO的特点就是在IO执行的两个阶段都被block了。
non-blocking IO(非阻塞IO)
linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:
从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。所以,用户进程其实是需要不断的主动询问kernel数据好了没有。
注意:
在网络IO时候,非阻塞IO也会进行recvform系统调用,检查数据是否准备好,与阻塞IO不一样,”非阻塞将大的整片时间的阻塞分成N多的小的阻塞, 所以进程不断地有机会 ‘被’ CPU光顾”。即每次recvform系统调用之间,cpu的权限还在进程手中,这段时间是可以做其他事情的,
也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。
#服务端 import time import socket tcp=socket.socket() tcp.bind(("127.0.0.1",8080)) tcp.listen(5) tcp.setblocking(False) while True: try: print("waiting client connection ......") conn,addr=tcp.accept() #进程主动轮询 print("+++",addr) client_message=conn.recv(1024) print(client_message.decode("utf8")) conn.close() #通信完成后直接断掉, except Exception as e: print(e) time.sleep(4) #客户端 import time import socket sk=socket.socket() while True: sk.connect(("127.0.0.1",8080)) print("hello") sk.sendall(bytes("hello","utf8")) time.sleep(2) break
优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在同时执行)。
缺点:任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。
IO multiplexing(IO多路复用)
IO multiplexing这个词可能有点陌生,但是如果我说select,epoll,大概就都能明白了。有些地方也称这种IO方式为event driven IO。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。(多说一句。所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)
在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。
注意1:select函数返回结果中如果有文件可读了,那么进程就可以通过调用accept()或recv()来让kernel将位于内核中准备到的数据copy到用户区。
注意2: select的优势在于可以处理多个连接,不适用于单个连接
#select通过IO多路复用解决soket并发问题 #server端 import socket import select sock=socket.socket() sock.bind(("127.0.0.1",8080)) sock.listen(5) inputs=[sock,] while True: r,w,e=select.select(inputs,[],[],0.5)
#r可读socket对象也就是文件描述符,对应第一个列表,如果检测到第一个列表中的对象中有对象可读了,把这个对象append给r
#w可写socket对象也就是文件描述符,对应第二个列表,如果检测到第二个列表中的对象中有对象可写了,把这个对象append给w print(len(r)) for obj in r: if obj == sock: conn,addr=obj.accept() print(conn) inputs.append(conn) else: data_byte=obj.recv(1024) print(data_byte.decode("utf8")) inp=input("回答%s号客户>>>"%inputs.index(obj)) obj.sendall(inp.encode("utf8")) print(">>:",r)
#client端 import socket sock=socket.socket() sock.connect(("127.0.0.1",8080)) while True: data=input(">>:") sock.send(data.encode("utf8")) rece_data=sock.recv(1024) print(rece_data.decode("utf8")) sock.close()
Asynchronous I/O(异步IO)
linux下的asynchronous IO其实用得很少。先看一下它的流程:
用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。但是实现起来比较麻烦。
IO模型比较分析
到目前为止,已经将四个IO Model都介绍完了。现在回过头来回答最初的那几个问题:blocking和non-blocking的区别在哪,synchronous IO和asynchronous IO的区别在哪。
先回答最简单的这个:blocking vs non-blocking。前面的介绍中其实已经很明确的说明了这两者的区别。调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。
在说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。Stevens给出的定义(其实是POSIX的定义)是这样子的:
A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
An asynchronous I/O operation does not cause the requesting process to be blocked;
两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO。有人可能会说,non-blocking IO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,如果kernel的数据没有准备好,这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。而asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。
各个IO Model的比较如图所示:
经过上面的介绍,会发现non-blocking IO和asynchronous IO的区别还是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,但是它仍然要求进程去主动的check,并且当数据准备完成以后,也需要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronous IO则完全不同。它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据。
selectors模块
import selectors import socket soc=socket.socket() soc.bind(("127.0.0.1",8080)) soc.listen(5) soc.setblocking(False) sel=selectors.DefaultSelector()#根据具体平台选择最佳IO多路机制,比如Linux选择epoll def read(conn,mask): #通信函数 try: #针对Windows客户端突然断开 data=conn.recv(1024) # if not data: 结局Linux上的客户端突然断开 # sel.unregister(conn) # return print(data.decode("utf8")) data2=input(">>:") conn.send(data2.encode("utf8")) except Exception: sel.unregister(conn) def accept(soc,mask): conn,addr=soc.accept() sel.register(conn,selectors.EVENT_READ,read) sel.register(soc,selectors.EVENT_READ,accept) #注册,这里可以理解为封装,监听谁就注册谁 #注册把套接字对象和所对应的方法封装到一起这里可以理解为封装到key中 #key.data就是通信函数或者链接函数 #key.fileobj就是文件描述符:服务端套接字或conn while True: events=sel.select() #监听 [(key1,mask1),(key2,mask2)....] for key,mask in events: func=key.data #通信函数或连接函数 obj=key.fileobj #文件描述符:服务端套接字soc或conn func(obj,mask) #accept(sock,mask) read(conn,mask)
#client端 import socket sock=socket.socket() sock.connect(("127.0.0.1",8080)) while True: data=input(">>:") sock.send(data.encode("utf8")) rece_data=sock.recv(1024) print(rece_data.decode("utf8")) sock.close()
IO多路复用的实现机制
IO多路复用实现机制:
win:只有select这种模型
Linux:select,poll,epoll
select效率最低,缺点:1.每次调用select需要把所有的fd由用户空间copy到内核空间(因为文件描述符是由用户态创建的,要在内核空间监听他 就要拷贝过去 只不过对于epoll来说 它之前的拷贝过的就不再拷贝了 select每次都会把全部的靠背一遍)(通过一个函数实现)
2.select通过遍历所有的fd是否有数据访问
3.最大连接数(1024),因为是在列表中存放着
poll:在实现上跟select是一样的,只改了最后一个,最大连接数无限(通过一个函数实现) epoll:通过3个函数实现机制
1.第一个函数:创建epoll句柄:将所有的fd(文件描述符)由用户空间copy到内核空间
2.回调函数(某一个函数或某一个动作成功完成之后,会触发的函数):他为所有的fd绑定一个回调函数,一旦有数据访问就触发该回调函数,回调函数将fd放到链表中,也就是说链表中放的就是变化的fd,相当于实现了动态监听
3.第三个函数:检测链表是否为空
selectors.DefaultSelector()会根据平台自动选择最好的IO机制。epoll>poll>select
补充:
对于文件描述符(套接字对象):
1是一个非零整数,不会变
2.收发数据的时候,对于接收端而言,数据先到内核空间,然后copy到用户空间,
内核空间清掉
demon主要用于监听
soket发的东西是被server端的操作系统放到内核空间了操作系统要把东西再给用户空间
切换效率:进程切换效率>线程切换效率>协程切换效率
因为,进程间的切换消耗非常大,他要保存的数据非常多,所以他要远远大于线程
而线程间的切换又比协程大的多,因为协程没有操作系统级别的调用
进程池和线程池
#线程池 # import time # from concurrent.futures import ThreadPoolExecutor # def task(i): # time.sleep(1) # print(i) # p=ThreadPoolExecutor(10) # for row in range(100): # p.submit(task,row) #进程池 # import time # from concurrent.futures import ProcessPoolExecutor # def task(i): # time.sleep(1) # print(i) # p=ProcessPoolExecutor(10) # if __name__ == '__main__': #必须要加这个么 # for row in range(100): # p.submit(task,row)
我们就拿ProcessPoolExecutor介绍下它的原理,引用官方代码注释中的流程图:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
|======================= In-process =====================|== Out-of-process ==|
+----------+ +----------+ +--------+ +-----------+ +---------+
| | => | Work Ids | => | | => | Call Q | => | |
| | +----------+ | | +-----------+ | |
| | | ... | | | | ... | | |
| | | 6 | | | | 5, call() | | |
| | | 7 | | | | ... | | |
| Process | | ... | | Local | +-----------+ | Process |
| Pool | +----------+ | Worker | | #1..n |
| Executor | | Thread | | |
| | +----------- + | | +-----------+ | |
| | <=> | Work Items | <=> | | <= | Result Q | <= | |
| | +------------+ | | +-----------+ | |
| | | 6: call() | | | | ... | | |
| | | future | | | | 4, result | | |
| | | ... | | | | 3, except | | |
+----------+ +------------+ +--------+ +-----------+ +---------+
|
我们结合源码和上面的数据流分析一下:
- executor.map会创建多个_WorkItem对象,每个对象都传入了新创建的一个Future对象。
- 把每个_WorkItem对象然后放进一个叫做「Work Items」的dict中,键是不同的「Work Ids」。
- 创建一个管理「Work Ids」队列的线程「Local worker thread」,它能做2件事:
- 从「Work Ids」队列中获取Work Id, 通过「Work Items」找到对应的_WorkItem。如果这个Item被取消了,就从「Work Items」里面把它删掉,否则重新打包成一个_CallItem放入「Call Q」这个队列。executor的那些进程会从队列中取_CallItem执行,并把结果封装成_ResultItems放入「Result Q」队列中。
- 从「Result Q」队列中获取_ResultItems,然后从「Work Items」更新对应的Future对象并删掉入口。
看起来就是一个「生产者/消费者」模型罢了,错了。我们要注意,整个过程并不是多个进程与任务+结果-2个队列直接通信的,而是通过一个中间的「Local worker thread」,它就是让效率提升的重要原因之一!!!
设想,当某一段程序提交了一个请求,期望得到一个答复。但服务程序对这个请求可能很慢,在传统的单线程环境下,调用函数是同步的,也就是说它必须等到服务程序返回结果后,才能进行其他处理。而在Future模式下,调用方式改为异步,而原先等待返回的时间段,在主调用函数中,则可用于处理其他事物。
Future
Future是常见的一种并发设计模式,在多个其他语言中都可以见到这种解决方案。
一个Future对象代表了一些尚未就绪(完成)的结果,在「将来」的某个时间就绪了之后就可以获取到这个结果。比如上面的例子,我们期望并发的执行一些参数不同的fib函数,获取全部的结果。传统模式就是在等待queue.get返回结果,这个是同步模式,而在Future模式下,调用方式改为异步,而原先等待返回的时间段,由于「Local worker thread」的存在,这个时候可以完成其他工作
在tornado中也有对应的实现。2013年的时候,我曾经写过一篇博客使用tornado让你的请求异步非阻塞,最后也提到了用concurrent.futures实现异步非阻塞的完成耗时任务。
原文链接:点我
协程的简单学习与使用
#生成器 #def my_gen(): # yield 1 # yield 2 # 协程 # def coroutine(): # print('Start') # x = yield 1 # print('Received: %s' % (x)) # # coro = coroutine() # print(coro) # print(next(coro)) # print(coro.send(10)) # def coroutine2(a): # print("Start: %s" % a) # b = yield a # print("Received: b=%s" % b) # c = yield a+b # print("Received: c=%s" % c) # # coro = coroutine2(1) # next(coro) # coro.send(2) # coro.send(10) # 协程与回调,感觉一个函数就解决了,就是要给正在执行的函数传一个参数 # def framework(logic,callback): # s = logic() # print(s) # s = s + " hello" # callback(s) # def logic(): # return 'Logic' # def callback(s): # print(s) # framework(logic,callback) # def framework(logic): # try: # it = logic() # s = next(it) # print(s) # it.send(s) # except StopIteration: # pass # # def logic(): # s = "Logic" # r = yield s # r = r + " hello" # print(r) # # framework(logic) # 协程中的生产者消费者模型 # def consumer(): # while True: # v = yield # print(v) # # def producer(c): # for i in range(10,13): # c.send(i) # # c = consumer() # c.send(None) # 等同于next(c) # producer(c) # c.close() # Raises new GeneratorExit exception inside the generator to terminate the iteration. # def consumer(): # r = '' # while True: # v = yield r # print(v) # r = v*2 # # def producer(c): # for i in range(10,13): # print(i) # r = c.send(i) # 当consumer的循环再次走到yield的时候会把r的值返回回来 # print(r) # # c = consumer() # c.send(None) # producer(c)