zoukankan      html  css  js  c++  java
  • 并发编程(一)

    1. 操作系统历史

    现在计算机系统是由一个或者多个处理,主存,磁盘,打印机,键盘,鼠标显示器,网络接口以及其他 I/O 设备组成的复杂系统。一个程序员不可能掌握所有系统实现的细节,为此我们就需要为计算机安装一层软件(即操作系统),其目的就是来管理以上设备。

    操作系统是一个用来协调、管理和控制计算机硬件和软件资源的系统程序,位于硬件和应用程序之间。

    1.1 真空管与穿孔卡片

    将对应于程序和数据的已穿孔的纸带或卡片,装入输入级,然后启动输入级把程序和数据输入计算机内存。再通过控制台开关启动程序针对数据运行,计算完毕,打印机输入计算机结果。用户取走结果并取下卡片,下一个用户才可以操作。

    特点

    • 程序员需要在墙上的计时表上预约时间
    • 同一时间只能有一个人在内存中调用 CPU 运行(串行)
    • 顺序计算,人机交互多

    优缺点

    • 优点:程序员在申请的时间段内可以独享整个资源,能够及时调试自己的程序,有 bug 也可以当场处理。

    • 缺点:对计算机提供商来说是一种浪费,人机交互太多(输入 —— 计算 —— 输出,输入 —— 计算 —— 输出 ...)。

    1.2 晶体管和批处理系统

    批处理系统就是将第一代计算机中的多次输入,集成一大波输入,然后顺序计算,再把计算结果集成一大波输出。

    用卡片批量收集好程序后,再人为将这些卡片送到机房读卡机上,然后再装载到次带上。随后,操作员装入一个特殊的程序(操作系统前身),它负责从磁带上读入第一个程序并允许,其输出写到第二个次带上,不打印。等每个程序都执行完毕后,操作员才会将输出磁带拿到另一台 1041 机器上脱机打印。

    优缺点

    • 优点:批处理
    • 缺点:仍然是顺序计算,需要人为输入,输出。

    1.3 集成电路芯片和多道程序设计

    1. SPOOLING 技术

    SPOOLING 技术实现了任何时候将卡片拿到机房都能快速将内容读入到卡片,并且当一个程序结束后,就能直接打印输出。省略了第二代中的 IBM1401 机,再也不用将磁带搬来搬去。

    2. 多道程序设计

    用于解决顺序执行问题

    在 7094机器上(程序运行机器),若当前程序因为磁带等其他 I/O 操作而暂停,CPU 就会处于闲置状态。对于商业处理数据,I/O 等待能达到 80% ~ 90%,造成了很大的 CPU 浪费。

    多道程序设计将内存分为几个部分,每个部分存放不同程序。在一个程序等待 I/O 完成时,另一个程序就可以使用 CPU,CPU 利用率能接近 100%。但也造成了一个问题,它要等所有程序执行完毕后才能输出计算结果。其本质也算是一个批处理系统。

    3. 分时操作系统

    分时操作系统采用的是多道技术多个联机终端的技术,多个程序加载到内存,只运行其中几个,剩余的都在思考。计算机为用户提供快速的交互服务,每个用户对应一个终端(屏幕),一旦程序执行完毕,就可以将结果输出到相应终端上,解决了不需要等所有程序执行完毕才输出的问题。

    特点:多终端 + 一个 CPU + 多道技术

    1.4 个人计算机

    随机大规模集成电路的发展,每个硅芯片上都能集成成千上万个晶体管,个人计算机就此时代到来。

    2. 进程和线程

    2.1 进程

    假如有两个程序 A 和 B,当 A 执行一半时需要读取大量数据输入(I/O 操作),而此时 CPU 只能等到 A 读取完数据才能继续执行,这样就导致 CPU 资源白白浪费。那么有什么技术可以实现在 A 读取数据的时候,让 B 去执行。等到 A 读取完数据后,B 暂停,然后继续执行 A 呢?

    当然可以,进程的实现采用的是类似于 多道程序技术,可以让 A 和 B 随意切换,使得 CPU 资源不浪费。

    但是在这里有个问题:切换,切换的过程会设计到状态的保存、恢复。以及 A、B所需要的系统资源(内存、硬盘、键盘灯)是不一样的,那么就需要有一个东西去记录它们分别需要什么资源,去怎样识别 A 和 B,所有这就有了进程的抽象。

    进程是一个程序在一个数据集上的一次动态执行过程(抽象的),由程序、数据集、进程控制块三部分组成。

    • 程序:是描述进程要完成哪些功能以及如何完成
    • 数据集:程序在执行过程中所需要的资源
    • 进程控制块:记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,是系统感知进程存在的唯一标志。

    2.2 线程

    一个进程只能做一件事,多个进程间每个进程都有自己的内存空间,假如一个程序有多个进程,那么进程与进程间互相通讯就是个问题,相互切换就必然会造成性能上的损失。

    线程是一种轻量级的进程,它是最小的 CPU 执行单元。不能单独存在,只能依托于进程。

    线程的出现降低了上下文切换的消耗,提高了系统的并发性,并突破了一个进程只能干一件事的缺陷,使得进程内并发成为可能。

    线程组成:线程 ID、程序计数器、寄存器集合、堆栈

    2.3 进程和线程区别

    • 一个程序只是有一个进程,一个进程至少有一个线程(进程可以理解为线程的容器)
    • 进程是最小的资源单位,每个进程都有自己独立的内存空间。
    • 线程是最小 CPU 执行单元,是进程的一个实体。
    • 进程在执行过程中拥有独立的内存单元,而多个线程共享内存
    • 每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。但是不能独立执行,必须依托于应用程序中,由应用程序提供多个线程执行控制
    • 线程是进程的一个实体,进程是抽象的,不拥有系统资源,只有运行中不可少的资源(如:程序计数器、一组寄存器和栈)。但是它可以与同属一个进程的其他线程共享进程所拥有的全部资源
    • 一个线程可以创建和撤销另一个线程,同一个进程中的多个线程间可以并发执行

    3. Python 线程

    3.1 线程调用的两种方式

    Python 以 threading 模块来调用线程,threading 模块建立在 thread 模块上。并对其二次封装,提供了更方便的 API 来处理线程,Python 调用线程的两种方式:

    • 直接调用
    • 继承式调用

    直接调用

    import threading
    import time
    
    def hi(n):		# 定义每个线程要运行的函数
        print('hello %s, time is %s' % (n, time.ctime()))
        time.sleep(5)
        print('hi 结束时间:', time.ctime())
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=hi, args=('rose',))	# 生成一个线程实例(即子线程)
        t1.start()		# 启动线程
    
        t2 = threading.Thread(target=hi, args=('lila',))	# 生成另一个线程实例(另一个子线程)
        t2.start()		# 启动另一个线程
    
        print('主线程结束时间:%s' % time.ctime())
    
    hello rose, time is Tue Dec 18 10:33:25 2018
    hello lila, time is Tue Dec 18 10:33:25 2018
    主线程结束时间:Tue Dec 18 10:33:25 2018
    hi 结束时间: Tue Dec 18 10:33:30 2018
    hi 结束时间: Tue Dec 18 10:33:30 2018
    

    在没有定义子线程时,整个程序就只有一个主线程。定义了子线程后,主线程和子线程同时运行。

    继承式调用

    import time
    import threading
    
    
    class MyThread(threading.Thread):
        def __init__(self, name):
            threading.Thread.__init__(self)
            self.name = name
    
        def run(self):
            print('hello', self.name)
            time.sleep(3)
    
    if __name__ == '__main__':
        t1 = MyThread('rose')
        t2 = MyThread('lila')
        t1.start()
        t2.start()
    
        print('程序结束')
    

    3.2 threading.Thread 的实例方法

    3.2.1 join 方法

    join()方法在这个子线程完成运行之前,它的父线程将会一直被阻塞在 join()方法调用的位置,直至子线程全部结束,才会接着往下执行。

    import time
    import threading
    
    
    def foo():
        print('foo 开始运行时间:%s' % time.ctime())
        time.sleep(3)
        print('foo 结束运行时间:%s' % time.ctime())
    
    
    def bar():
        print('bar 开始运行时间:%s' % time.ctime())
        time.sleep(5)
        print('bar 结束运行时间:%s' % time.ctime())
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=foo)
        t2 = threading.Thread(target=bar)
    
        t1.start()
        t2.start()
    
        t1.join()
        t2.join()
    
        print('总线程结束时间:%s' % time.ctime())
    
    foo 开始运行时间:Tue Dec 18 10:47:54 2018
    bar 开始运行时间:Tue Dec 18 10:47:54 2018
    foo 结束运行时间:Tue Dec 18 10:47:57 2018
    bar 结束运行时间:Tue Dec 18 10:47:59 2018
    总线程结束时间:Tue Dec 18 10:47:59 2018
    

    主线程、子线程 t1、t2 同时执行,主线程运行到 t1.join()、t2.join() 时被阻塞。等到两个子线程都结束后,才接着继续执行。

    t1、t2 开始执行,3s 后 t1 结束执行。2s 后 t2 结束执行,主线程继续执行。

    t2.join()注释后又是怎么样呢:

    t1.join()
    # t2.join()
    
    foo 开始运行时间:Tue Dec 18 10:52:38 2018
    bar 开始运行时间:Tue Dec 18 10:52:38 2018
    foo 结束运行时间:Tue Dec 18 10:52:41 2018
    总线程结束时间:Tue Dec 18 10:52:41 2018
    bar 结束运行时间:Tue Dec 18 10:52:43 2018
    

    可以看到 t1 结束后,主线程也跟着结束,因为没有 t1 结束,t2 没有阻塞主线程。

    3.2.2 setDaemon 方法

    主线程 A 中,创建子线程 B,并设置 B.setDameon(),即将主线程 A 设置为 守护进程。当主线程 A 结束后,不管子线程 B 是否结束,也跟着主线程 A 一起退出,与 join() 方法相反。需要注意的是它必须设置在 start() 方法之前。

    import time
    import threading
    
    def foo():
        time.sleep(3)
        print('子线程结束时间:', time.ctime())
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=foo)
    
        t1.setDaemon(True)
        t1.start()
    
        print('主线程结束时间: ', time.ctime())
    
    主线程结束时间:  Tue Dec 18 16:28:05 2018
    

    主线程 A 结束后,子线程 t1 还没执行完,也跟着结束了。

    没有设置守护进程时:

    主线程结束时间:  Tue Dec 18 16:39:12 2018
    子线程结束时间: Tue Dec 18 16:39:15 2018
    

    主线程与子线程兵分两路,分别执行。当主线程要退出时,会检查子线程是否完成,如果子线程未结束,主线程会等子线程完成后一起退出。

    但有时我们需要只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这是就可以用 setDaemon() 方法了。

    3.2.3 其他方法

    线程对象方法

    • run():线程被 CPU 调用后,自动执行线程对象的 run 方法
    • start():启动线程活动
    • isAlive():检查线程是否活动的
    • getName():返回线程名
    • setName():设置线程名

    threading 模块方法

    • threading.currentThread():返回当前的线程变量
    • threading.current_thread().name:返回当前线程的线程名
    • threading.current_thread().getName():返回当前运行的线程名
    • threading.enumerate():返回一个包含正在运行的线程的 list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程
    • threading.activeCount():返回正在运行的线程数量,与 len(threading.enumerate()) 结果相同。

    4. 多线程

    • 并发:指系统具有处理多个任务(动作)的能力

    • 并行:指系统具有 同时处理多个任务的能力

    可以使用电脑一边看电视,一边听音乐,或者浏览网页,CPU 在这几个任务间随意切换,只是切换的速度很快,人为很难察觉到。

    • 同步:当进程执行到一个 IO(等待外部数据)的时候,—— 等:同步
    • 异步:当进程执行到一个 IO(等待外部数据)的时候,—— 不等:异步。在需要等待的时候去干别的事,一直等到数据接收成功,再回来处理。

    4.1 GIL

    GIL(Global Interpreter Lock):全局解释锁,这是从 Python 解释器层面给进程加的一把锁,其目的是为了保证线程的安全。

    其核心思想是,无论开了个少个线程,有多少个 CPU(多核),Python 解释器在执行时,在同一时刻只允许一个线程运行。

    优缺点

    • 优点:保证了线程安全,省去了自己加锁的麻烦。其他语言如:Java,需要自己受手动给每个线程加锁,以此来保证线程之间的安全。
    • 缺点:在同一时刻只能有一个线程运行,只有等这个线程遇到 切换时才会将 CPU 给另一个线程,在如今这个多核的时代,使多核成为了鸡肋。

    GIL 给每个进程加了一把锁,导致进程中在同一时刻,只能有一个线程被 CPU 调用(可以理解为进程只有一个出口)。

    其他线程要想被调用只能等前一个线程运行完,或者遇到 CPU 切换才能被调用。

    CPU 切换的两种情况

    • IO 切换:程序运行时需要输入输出的情况
    • 时间轮询:又称时间片轮转调度,是一种最古老,最简单,且最公平使用最广泛的算法。每个进程被分配一个时间段,即该进程允许运行的时间。如果在时间片结束时进程还在运行,则 CPU 将剥夺并分配给另一个进程。如果进程在时间片结束前阻塞或结束,则 CPU 当即进行切换。

    程序运行中的任务分类:

    • IO 密集型
      • 即整个程序中有大量 IO 操作。CPU 有空闲时间(time.sleep() 相当于 IO 操作)。
      • CPU 遇到 IO 操作便可以切换到另一个线程,这样Python 多线程就有意义。
      • 推荐使用多线程
    • 计算密集型
      • 即整个程序很少有 IO 操作,有大量计算,只能靠时间轮询来切换 CPU,其他线程需要等时间轮询才能被 CPU 调用。
      • 开多线程没啥意义,不推荐使用多线程
      • 解决办法:多进程 + 协程

    4.2 同步锁

    创建 100 个进程,对公共变量进行减运算:

    import time
    import threading
    
    num = 100   # 设定一个公共变量
    
    
    def foo():
        global num
        temp = num
        time.sleep(0.001)         # 0.1:99、0.01:98、0.001:85
        num = temp - 1      # 对公共变量进行操作
    
    l = []
    
    # 创建 100 个进程
    for i in range(100):
        t = threading.Thread(target=foo)
        l.append(t)
        t.start()
    
    for t in l:       # 等待所有进程执行完毕
        t.join()
    
    print(num)
    

    观察发现在没有添加 sleep 时,结果为 0 。而 time.sleep(0.1)/0.01/0.001 时,结果分别为:99/98/85,不是我们想要的 0 。

    这是因为多个线程在操作同一个资源(公共变量)时,会造成资源破坏。当一个线程被 CPU 调用执行时,可能还没执行完,就被切换到另一个线程。导致了公共变量没有及时更新,下一个进程拿到的有可能还是没有更新的公共变量。

    我们可以通过同步锁来解决:

    # 创建一个同步锁
    lock = threading.Lock()
    
    def foo():
        global num
        lock.acquire()		# 加锁
        temp = num
        time.sleep(0.001)        
        num = temp - 1      # 对公共变量进行操作
        lock.release()		# 释放锁
    

    4.3 死锁和递归锁

    是在线程间共享多个资源时,如果两个线程分别占用一部分资源,并且同时等待对方的资源,就会造成死锁。因为系统判定这部分资源都正在使用。这两个进程在无外力的情况下,会一直等待下去。一些为一个死锁的例子:

    import threading
    import time
    
    
    class MyThread(threading.Thread):
        def A(self):
            # 三个线程同时竞争(申请)A 锁
            lockA.acquire()
            print(self.name, 'gotLockA', time.ctime())
            time.sleep(2)
    
            lockB.acquire()     # 申请 B 锁
            print(self.name, 'gotLockB', time.ctime())
    
            lockB.release()     # 释放 B 锁
            lockA.release()     # 释放 A 锁
    
        def B(self):
            lockB.acquire()     
            print(self.name, 'gotLockB', time.ctime())
            time.sleep(2)
    
            lockA.acquire()
            print(self.name, 'gotLockA', time.ctime())
    
            lockA.release()
            lockB.release()
    
        def run(self):
            self.A()
            self.B()
    
    if __name__ == '__main__':
    	# 创建 A、B 锁
        lockA = threading.Lock()
        lockB = threading.Lock()
    
        l = []
    	# 创建 3 个线程对象
        for i in range(3):
            t = MyThread()
            t.start()
            l.append(t)
    
        for t in l:
            t.join()
    
    Thread-1 gotLockA Wed Dec 19 15:28:08 2018
    Thread-1 gotLockB Wed Dec 19 15:28:10 2018
    Thread-1 gotLockB Wed Dec 19 15:28:10 2018
    Thread-2 gotLockA Wed Dec 19 15:28:10 2018
    

    首先调用执行 A 函数,3个线程同时申请 A 锁,其中 1 个申请成功(假设是 Thread-1),其他 2个继续等待。打印 Thread-1 gotLockA Wed Dec 19 15:28:08 2018,2s 后,Thread-1 申请 B 锁成功,打印 Thread-1 gotLockB Wed Dec 19 15:28:10 2018,然后 Thread-1 解锁 B,再解锁 A。

    Thread-1 继续往下执行,调用 B 函数,获得 B 锁。在同一时刻,剩下的两个线程在 A 函数中,其中一个(假设是 Thread-2)获得了 A 锁。2s 后,Thread-1 想要申请 A 锁,Thread-2 想要申请 B 锁,两个线程互不给对方,从而造成了死锁,程序一直等待在这个位置。

    解决办法:递归锁

    为了支持在同一线程中多次请求同一资源,Python 提供了 可重入锁 的概念:创建一个 RLock 对象threading.RLock()。其内部维护中一个 Lock 和一个计数 counter 变量,counter 记录了 acquire 的次数。没 acquire 一次就记录一次,没 release 一次就见一次,直至一个线程的 acquire 被全 release ,其他线程才能获得自资源。

    # 创建一个 RLock 对象,并将 A、B 锁替换即可
    lockR = threading.RLock()
    

    示例:

    import threading
    import time
    
    
    class MyThread(threading.Thread):
        def A(self):
            # 三个线程同时竞争(申请)A 锁
            # counter = 0
            lockR.acquire()   # counter = 1
            print(self.name, 'gotLockA', time.ctime())
            time.sleep(2)
    
            lockR.acquire()     # counter = 2
            print(self.name, 'gotLockB', time.ctime())
    
            lockR.release()     # counter = 1
            lockR.release()     # counter = 0
    
        def B(self):
            lockR.acquire()
            print(self.name, 'gotLockB', time.ctime())
            time.sleep(2)
    
            lockR.acquire()
            print(self.name, 'gotLockA', time.ctime())
    
            lockR.release()
            lockR.release()
    
        def run(self):
            self.A()
            self.B()
    
    if __name__ == '__main__':
        lockR = threading.RLock()	# 创建 RLock 锁
    
        l = []
    
        for i in range(3):
            t = MyThread()
            t.start()
            l.append(t)
    
        for t in l:
            t.join()
    

    执行 A 函数时,Thread-1 获得 A 锁,counter = 1,再获得 B 锁,counter =2 。释放 B、A 锁,counter = 0 。因为 Thread -1 已经释放了资源,counter=0。所以在执行 B 函数时,三个线程需要再竞争,而不是 Thread-1 继续执行 B 函数。

    Thread-1 gotLockA Wed Dec 19 15:58:09 2018
    Thread-1 gotLockB Wed Dec 19 15:58:11 2018
    Thread-2 gotLockA Wed Dec 19 15:58:11 2018
    Thread-2 gotLockB Wed Dec 19 15:58:13 2018
    Thread-2 gotLockB Wed Dec 19 15:58:13 2018
    Thread-2 gotLockA Wed Dec 19 15:58:15 2018
    Thread-1 gotLockB Wed Dec 19 15:58:15 2018
    Thread-1 gotLockA Wed Dec 19 15:58:17 2018
    Thread-3 gotLockA Wed Dec 19 15:58:17 2018
    Thread-3 gotLockB Wed Dec 19 15:58:19 2018
    Thread-3 gotLockB Wed Dec 19 15:58:19 2018
    Thread-3 gotLockA Wed Dec 19 15:58:21 2018
    

    4.4 Event 对象

    Event 对象用于线程间通信,是由线程设置的信号标志,如果信号标志位为 False,则线程等待直至信号被其他线程设置为 True。即程序中一个线程需要通过判断某个线程的状态来确定自己的下一步操作,就可以用到 event 对象了。

    1. 设置信号

    使用event.set() 方法可以设置 Event 对象内部信号标志为 True。Event 对象提供isSet() 方法来判断其内部信号标志的状态。

    2. 清除信号

    使用 event.clear() 方法可以清除 Event 对象内部的信号标志,即将其设置为 False。

    3. 等待

    Even 对象的 event.wait()方法只有在内部信号为 True 的时候,才能执行并完成返回。当内部信号标志位为 False 时,wait 方式一直等待直至为真才返回。

    示例:

    主线程与子线程通讯

    import time
    import threading
    
    
    def foo():
        print('%s线程正在等待:【%s】' % (threading.current_thread().getName(), time.ctime()))
        print(event.isSet())
        event.wait()	# 当标志位为 False 时会一直等待,直至被重新设置为 True 时才能继续执行
        print('%s线程执行完毕:【%s】' % (threading.current_thread().getName(), time.ctime()))
    
    
    if __name__ == '__main__':
        event = threading.Event()
        t = threading.Thread(target=foo)
        t.start()
    
        print('ending:', time.ctime())
        # time.sleep(3)
        # event.set()		# 将标志位设置为 True
    
    Thread-1线程正在等待:【Wed Dec 19 17:17:27 2018】
    ending: Wed Dec 19 17:17:27 2018
    False
    # Thread-1线程执行完毕:【Wed Dec 19 17:21:02 2018】
    

    线程1和主线程同时执行,主线程执行完毕,线程1 被阻塞在 wait 处。等到主线程重新设置 Event 对象的标志位为 True 时,才继续执行。

    子线程间通讯

    import time
    import threading
    
    def foo():
        print('foo 正在执行:【%s】', time.ctime())
        time.sleep(3)
        # event.set()
    
    def bar():
        print('bar 正在执行:【%s】' % time.ctime())
        print(event.isSet())
        event.wait()
        print('bar 执行完毕:【%s】' % time.ctime())
    
    
    if __name__ == '__main__':
        event = threading.Event()
    
        t1 = threading.Thread(target=foo)
        t2 = threading.Thread(target=bar)
    
        t1.start()
        t2.start()
    
    foo 正在执行:【%s】 Wed Dec 19 17:28:30 2018
    bar 正在执行:【Wed Dec 19 17:28:30 2018】
    False
    # bar 执行完毕:【Wed Dec 19 17:28:33 2018】
    

    多线程阻塞

    import time
    import threading
    
    event = threading.Event()
    
    def foo():
        while not event.isSet():
            print('%s 正在等待:%s' % (threading.current_thread().getName(), time.ctime()))
            event.wait()
        print('%s 已经连接:%s' % (threading.current_thread().getName(), time.ctime()))
    
    for i in range(3):
        t = threading.Thread(target=foo)
        t.start()
    
    print('ending:%s' % time.ctime())
    # time.sleep(3)
    # event.set()
    
    Thread-1 正在等待:Wed Dec 19 17:58:48 2018
    Thread-2 正在等待:Wed Dec 19 17:58:48 2018
    Thread-3 正在等待:Wed Dec 19 17:58:48 2018
    ending:Wed Dec 19 17:58:48 2018
    # Thread-1 已经连接:Wed Dec 19 17:58:51 2018
    # Thread-3 已经连接:Wed Dec 19 17:58:51 2018
    # Thread-2 已经连接:Wed Dec 19 17:58:51 2018
    

    4.5 信号量

    信号量用来控制线程并发数的,管理一个内置的计数器。每当调用 acquire 一次就 -1,调用 release 一次时就 +1,计数器不能小于 0 。当计数器为 0 时,acquire 将阻塞线程至同步锁定状态,直到其他线程调用 release。

    信号量(Semaphore)类似于管理线程的锁,它可以指定同时又几个线程并发:

    import time
    import threading
    
    
    def foo():
        if s.acquire():
            print('%s 正在执行:%s' % (threading.current_thread().getName(), time.ctime()))
            time.sleep(3)
            s.release()
    
    
    if __name__ == '__main__':
        s = threading.Semaphore(3)		# 创建 Semaphore 对象,同时允许 3 个线程实现并发
        for i in range(6):		# 创建 6 个子线程
            t = threading.Thread(target=foo)
            t.start()
    

    创建 Semaphore 对象,同时允许 3 个线程实现并发,睡眠 3s。所有没过 3s 就会有 3个线程并发执行:7

    Thread-1 正在执行:Wed Dec 19 18:00:48 2018
    Thread-2 正在执行:Wed Dec 19 18:00:48 2018
    Thread-3 正在执行:Wed Dec 19 18:00:48 2018
    Thread-4 正在执行:Wed Dec 19 18:00:51 2018
    Thread-5 正在执行:Wed Dec 19 18:00:51 2018
    Thread-6 正在执行:Wed Dec 19 18:00:51 2018
    

    4.6 queue 队列

    queue 队列是一种数据结构,存储方式类似于列表,但是取数据方式不同于列表,多用于多线程。

    在多线程中使用列表存储数据,是不安全的,但是 queue 队列可以保证线程间的数据安全。

    1. 队列数据存取三种模式

    • 先进先出(FIFO):先存的数据,先出,后存的数据后出,queue.Queue()
    • 先进后出(LIFO):同栈,即先存的数据后出,后存的数据先出,queue.LifoQueue()
    • 优先级:优先级越高越先出来,queue.PriorityQueue()

    2. queue 对象常用方法

    import queue
    
    q = queue.Queue(maxsize=0)		# 创建队列队形,maxsize 为队列长度,小于 1 表示队列无限长
    q.put(item, block=True, timeout=None)		# 存储值到队列中,block 可选。若队列为空,block 为 True,put()使用线程暂停。若 block 为 False,put() 将会引发异常
    q.get(block=True, timeout=None)		# 从队列中取值,block 可选。若队列为空,block 为 True,get()使用线程暂停。若 block 为 False,get() 将会引发异常
    q.qsize()	# 返回队列大小
    q.empty()	# 查看队列是否为空,返回布尔值
    q.full()	# 查看队列是否已满,返回布尔值
    q.get_nowait()	# 相当于 q.get(False)
    q.put_nowait(item)	# 相当于 q.put(item, False)
    q.task_done()		# 在完成一项工作之后,会想任务已完成的队列发送一个信号
    q.join()		# 等队列为空后,再执行别的操作
    

    当队列满了时,再往里面 put 值。或队列为空,再往里面取值。若 block 为 True,线程将会暂停,为 False 则会报错:

    q = queue.Queue(maxsize=2)
    q.put(1)
    q.put('2')
    q.put(3, False)
    
      File "E:/PycharmProjects/oldboy/进程线程/queue 队列.py", line 19, in <module>
        q.put(1, False)
      File "C:UsersHJAnaconda3libqueue.py", line 130, in put
        raise Full
    queue.Full
    

    示例:

    import queue
    
    # # 先进先出
    # q = queue.Queue(maxsize=6)    # 3、12、True
    
    # # 先进后出
    # q = queue.LifoQueue(maxsize=6)    # True、12、3
    
    # 优先级
    q = queue.PriorityQueue(maxsize=6)  # [1, '12']、[2, 3]、[3, True]
    q.put([2, 3])
    q.put([1, '12'])
    q.put([3, True])
    
    # q.put(3)
    # q.put('12')
    # q.put(True)
    
    while True:
        data = q.get()
        # print(data)
        print(data[1])
    

    join 和 task_done

    • task_done:在完成一项工作后,向已完成队列发送一个信号
    • join:等队列为空,才能继续执行别的操作,否则一直被阻塞
    import time
    import threading
    
    q = queue.Queue(6)
    
    def put():
        for i in range(6):
            q.put(i)
        q.join()		# 阻塞进程,直至所有任务完成,才会打印 ok
        time.sleep(2)
        print('ok')
    
    
    def get():
        for i in range(6):
            print(q.get())
            q.task_done()	# 每取一次就向队列发送一次信号
    
    t1 = threading.Thread(target=put)
    t2 = threading.Thread(target=get)
    
    t1.start()
    t2.start()
    
    0
    1
    2
    3
    4
    5
    ok
    

    4.7 生产者消费者模型

    通过一个容器(即队列)来解决生产者和消费者的强耦合问题。生产者和消费者不直接通讯,而是通过阻塞队列来进行通讯。生产者生产完后,扔给队列,消费者直接从阻塞队列里取,平衡了生产者和消费者的处理能力。

    在线程世界里,生产数据的线程就是生产者,消费者就是消费数据的线程。在多线程开发中,若生产者处理的速度过快,而消费者处理速度很慢,那么生产者就必须等到消费者处理完,才能继续生产数据。反之亦然,为了平衡两者的速度,我们引入了生产者消费者模型。

    就像餐厅里,厨师做菜不需要和顾客接触,而是通过服务员。

    import threading
    import time
    import queue
    import random
    
    q = queue.Queue()
    
    def produce(name):
        count = 0
        while count < 3:
            print('正在制作中....')
            time.sleep(5)
            q.put(count)
            print('%s制作 %s 完毕:%s' % (name, count, time.ctime()))
            count += 1
            print('ok')
    
    
    def consumer(name):
        count = 0
        while count < 3:
            time.sleep(random.randrange(4))
            data = q.get()
    
            time.sleep(2)
            print('33[32;1m %s 正在吃 %s33[0m:%s' % (name, data, time.ctime()))
            count += 1
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=produce, args=('生产者',))
        t2 = threading.Thread(target=consumer, args=('消费者1',))
        t3 = threading.Thread(target=consumer, args=('消费者2',))
        t4 = threading.Thread(target=consumer, args=('消费者3',))
    
        t1.start()
        t2.start()
        t3.start()
        t4.start()
    
    正在制作中....
    生产者制作 0 完毕:Wed Dec 19 21:35:43 2018
    ok
    正在制作中....
     消费者2 正在吃 0:Wed Dec 19 21:35:45 2018
    生产者制作 1 完毕:Wed Dec 19 21:35:48 2018
    ok
    正在制作中....
     消费者1 正在吃 1:Wed Dec 19 21:35:50 2018
    生产者制作 2 完毕:Wed Dec 19 21:35:53 2018
    ok
     消费者3 正在吃 2:Wed Dec 19 21:35:55 2018
    

    我们引入 join 和 task_done,使得每生产一个,就吃一个:

    import threading
    import time
    import queue
    import random
    
    q = queue.Queue()
    
    
    def produce(name):
        count = 0
        while count < 3:
            print('正在制作中....')
            time.sleep(5)
            q.put(count)
            print('%s制作 %s 完毕:%s' % (name, count, time.ctime()))
    
            count += 1
    
            q.join()
            print('ok')
    
    
    def consumer(name):
        count = 0
        while count < 3:
            time.sleep(random.randrange(4))
            data = q.get()
    
            time.sleep(2)
            q.task_done()
            print('33[32;1m %s 正在吃 %s33[0m:%s' % (name, data, time.ctime()))
    
            count += 1
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=produce, args=('生产者',))
        t2 = threading.Thread(target=consumer, args=('消费者1',))
        t3 = threading.Thread(target=consumer, args=('消费者2',))
        t4 = threading.Thread(target=consumer, args=('消费者3',))
    
        t1.start()
        t2.start()
        t3.start()
        t4.start()
    
    正在制作中....
    生产者制作 0 完毕:Wed Dec 19 21:36:31 2018
     消费者2 正在吃 0:Wed Dec 19 21:36:33 2018
    ok
    正在制作中....
    生产者制作 1 完毕:Wed Dec 19 21:36:38 2018
     消费者1 正在吃 1:Wed Dec 19 21:36:40 2018
    ok
    正在制作中....
    生产者制作 2 完毕:Wed Dec 19 21:36:45 2018
     消费者3 正在吃 2:Wed Dec 19 21:36:47 2018
    ok
    

    5. 多进程

    由于 GIL 的存在,Python 中的多线程并不是真正的多线程。要想充分利用多核 CPU 资源,需要用到多进程。

    5.1 multiprocessing 模块

    multiprocessing 模块可以实现 Python 多进程,其原理是在进程中开多个子进程。用法与 threading 模块类似,其内部方法同名,唯一有区别的是守护进程 multiprocessing 模块是:p.daemon=True

    直接调用

    import time
    import multiprocessing
    
    
    def foo():
        time.sleep(1)
        print('%s 正在执行 %s' % (multiprocessing.current_process().name, time.ctime()))
    
    if __name__ == '__main__':
        for i in range(3):
            p = multiprocessing.Process(target=foo)
            p.start()
    
    Process-1 正在执行 Wed Dec 19 22:29:26 2018
    Process-2 正在执行 Wed Dec 19 22:29:26 2018
    Process-3 正在执行 Wed Dec 19 22:29:26 2018
    

    继承式调用

    class MyProcess(multiprocessing.Process):
        def run(self):
            time.sleep(1)
            print('%s 正在执行 %s' % (multiprocessing.current_process().name, time.ctime()))
    
            
    if __name__ == '__main__':
        for i in range(3):
            p = MyProcess()
            p.start()
    
    MyProcess-2 正在执行 Wed Dec 19 22:32:45 2018
    MyProcess-1 正在执行 Wed Dec 19 22:32:45 2018
    MyProcess-3 正在执行 Wed Dec 19 22:32:45 2018
    

    查看进程 PID

    import os
    import time
    from multiprocessing import Process
    
    
    def foo(title):
        print('title', title)
        print('parent process pid', os.getppid())
        print('process pid', os.getpid())
    
    
    if __name__ == '__main__':
        foo('lila')
        time.sleep(1)
        print('---------------')
    
        p = Process(target=foo, args=('rose',))
        p.start()
    
    title lila
    parent process pid 10476	# pycharm 的进程 PID
    process pid 10640			# 主进程
    ---------------
    title rose
    parent process pid 10640	# 主进程
    process pid 9968		# 子进程
    

    方法属性

    构造方法:

    Process([group [,target [,name [,args [,kwargs]]]]])
    	- group:线程组,目前还没实现。库引用中提示必须是 None
        - target:要执行的方法
        - name:进程名
        - args/kwargs:要传入方法的参数
    

    实例方法:

    is_alive()		# 检查进程是否在运行
    join([timeout])		# 阻塞当前上下文环境的进程,直至调用此方法的进程终止或到达指定的 timeout(可选)
    start()		# 进程准备就绪,等待 CPU 调度
    run()		# start()调用 run 方法,如果实例进程时未制定传入 target,这 start 执行默认 run 方法
    terminate()		# 不管任务是否完成,立即停止工作进程
    

    属性:

    daemon		# 和线程的 setDeamon 一样,设置守护进程
    name		# 进程名字
    pid			# 进程号
    multiprocessing.current_process().name		# 获取当前运行的进程名
    

    5.2 进程间通讯

    Python 进程之间通讯有三种方式:队列 Queue、管道、Managers

    5.2.1 进程队列 Queue

    与创建线程队列一样,不过却是用进程模块 multiprocessing 创建:

    q = multiprocessing.Queue()
    

    示例:

    import multiprocessing
    
    
    def foo(q):
        q.put(12)
        q.put(34)
        q.put(56)
    
    
    if __name__ == '__main__':
        # 创建 queue 进程队列
        q = multiprocessing.Queue()
    
        for i in range(3):
            p = multiprocessing.Process(target=foo, args=(q,))
            p.start()
    
        print(q.get())
        print(q.get())
        print(q.get())
    
    12
    34
    56
    

    5.2.2 管道

    利用 Pipe() 函数创建一个双向管道连接对象,一头连接子线程,另一头连接主线程。

    import multiprocessing
    from multiprocessing import Pipe
    
    
    def foo(conn):
        conn.send([12, {'name':'rose'}, 'str'])
    
        res = conn.recv()
        print('%s:%s' % (multiprocessing.current_process().name, res))
    
    if __name__ == '__main__':
        # 创建双向管道
        parent_conn, son_conn = Pipe()
    
        p = multiprocessing.Process(target=foo, args=(son_conn,))
    
        p.start()
    
        response = parent_conn.recv()
        print('%s:%s' % (multiprocessing.current_process().name, response))
        parent_conn.send('Hello')
    
    MainProcess:[12, {'name': 'rose'}, 'str']
    Process-1:Hello
    

    5.2.3 Managers

    Queue、Pipe 只是实现了数据交互(copy 了一份,进程多就会消耗很大的空间),并没有实现数据共享,即不能改变原始数据。

    Python 提供 Managers 可以实现进程间数据共享,它支持: list、dict、变量、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue、Value、Array。

    import multiprocessing
    from multiprocessing import Manager
    
    
    def foo(d, l, n):
        d[n] = '1'
        d['2'] = 2
    
        l.append(n)
    
    if __name__ == '__main__':
        with Manager() as m:
            # 创建字典
            d = m.dict()
            
            # 创建列表
            l = m.list()
    
            for i in range(3):
                p = multiprocessing.Process(target=foo, args=(d, l, i))
                p.start()
                p.join()
    
            print(d)
            print(l)
    
    {0: '1', '2': 2, 1: '1', 2: '1'}
    [0, 1, 2]
    

    5.3 进程同步

    进程同步锁:与线程同步锁类似,但是进程之间通常不是为了共享数据的同步访问,而是其他资源的同步访问,如:共享屏幕打印数据。

    为每个共享资源创建一个 Lock 对象,当需要访问该资源时。需要事前申请 acquire 方法来获得锁,若其他进程已获得该锁,则需要等释放锁(release 方法)后才能申请使用。

    from multiprocessing import Process, Lock
    
    
    def foo(l, args):
        l.acquire()     # 获得锁
        print('hello', args)
        l.release()     # 释放锁
    
    if __name__ == '__main__':
        # 创建 Lock 对象
        lock = Lock()
    
        for i in range(10):
            p = Process(target=foo, args=(lock, i,))        # 传入子进程中
            p.start()
    

    5.4 进程池

    开多进程是为了实现并发,通常有几个 CPU 就开几个进程,但是进程数一旦过多也会占用很大内存空间,影响效率,因此引入了进程池来线程进程的数量。

    进程池内部维护了一个进程序列,当使用时,则去进程池中获取一个进程。若进程池中没有可用的进程,那么程序就会等待,直至进程池中有可用的进程为止。

    常用方法:

    pool = Pool([nums])   # 创建一个进程池对象,如果不指定进程数目,则默认电脑CPU核心数
    pool.apply(func, args=(), kwds={}) # 同步接口,一次只能实现一个进程运行
    pool.apply_async(func, args=(), kwds={}, callback=None)     # 异步接口,进程池大小有多大就能同时开启几个进程,同时可以回调其他方法
    pool.close()    # 关闭进程池,如果所有操作持续挂起,将在工作进程终止前完成
    pool.join()     # 等待所有工作进程退出,只能在 close() 或 teminate() 之后 
    

    示例1:同时 3 个进程并发运行

    from multiprocessing import Process, Pool
    import time
    
    
    def foo(i):
        time.sleep(1)
        print('%s:%s' %(i, time.ctime()))
    
    if __name__ == '__main__':
        # 创建一个进程池
        pool = Pool(3)
    
        for i in range(10):
            # 子进程的生成由进程池维护
            pool.apply_async(func=foo, args=(i, ))
    
        pool.close()
        pool.join()     # 必须在 close() 之后
    
        print('end')
    

    每隔 1s,三个进程并发运行:

    0:Fri Dec 21 16:57:30 2018
    1:Fri Dec 21 16:57:30 2018
    2:Fri Dec 21 16:57:30 2018
    3:Fri Dec 21 16:57:31 2018
    4:Fri Dec 21 16:57:31 2018
    5:Fri Dec 21 16:57:31 2018
    6:Fri Dec 21 16:57:32 2018
    7:Fri Dec 21 16:57:32 2018
    8:Fri Dec 21 16:57:32 2018
    9:Fri Dec 21 16:57:33 2018
    end
    

    回调 callback()就是某个动作或函数执行完毕后,再执行的函数。主进程中再回调主进程函数,常用于多进程中生成日志文件:

    from multiprocessing import Process, Pool
    import time
    
    
    def foo(i):
        time.sleep(1)
        print('%s:%s' %(i, time.ctime()))
        return i    # 每个进程的返回值,即是回调函数的参数 args
    
    
    def bar(args):
        print(args)
    
    if __name__ == '__main__':
        # 创建一个进程池
        pool = Pool(3)
    
        for i in range(6):
            pool.apply_async(func=foo, args=(i, ), callback=bar)
    
        pool.close()
        pool.join()
    
        print('end')
    
    0:Fri Dec 21 17:08:00 2018
    0
    1:Fri Dec 21 17:08:00 2018
    1
    2:Fri Dec 21 17:08:00 2018
    2
    3:Fri Dec 21 17:08:01 2018
    3
    4:Fri Dec 21 17:08:01 2018
    4
    5:Fri Dec 21 17:08:01 2018
    5
    end
    

    Tips: 使用回调函数,每个进程必须有返回值,其返回值为回调函数的参数(args)。回调函数属于主进程。

    6. 协程

    协程(Coroutine),又称为微线程。区别于多线程、多进程的抢占式模式,它是协作式,其本质单线程的并发,由程序内部控制线程之间的切换。

    协程是用户态的轻量级线程,可以由用户程序自己控制调度(多进程、多线程自动切换,不可控)。

    注意:

    • Python 线程属于内核态的,由操作系统控制调度,一旦遇到 IO 操作(如:IO 操作、时间轮询)就需要将 CPU 执行权限交出给其他线程使用。
    • 单线程内开启协程,一旦遇到 IO,由应用程序控制切换。

    优点

    • 执行效率高,子程序的切换由程序控制,而非线程,因此没有线程切换的消耗。
    • 与多线程比,线程数量越多,协程性能优势越明显。
    • 因为只有一个线程,所有不存在锁机制,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态即可,所有执行效率比多线程高很多。

    缺点:

    • 因为只有一个线程,所有无法有效利用多核。解决办法:多进程+协程(即一个程序开多个进程,每个进程中开多个线程,再每个线程中开协程)。

    • 协程一旦遇到阻塞,将会阻塞整个线程。

    实现协程的三种方式:

    • yield:利用 yield 的 send() 方法,在子程序间切换,手动
    • Greenlet 模块: 进一步封装 yield,手动
    • Gevent 模块: 自动切换

    协程主要解决的还是 IO 操作

    6.1 yield 实现协程

    import time
    
    
    def consumer(name):
        while True:
            print('开始吃包子...')
            s = yield
            print('%s 开始吃第 %s 个包子' % (name, s))
    
    
    def produce():
        r = conn1.__next__()    # 调用生成器
        r = conn2.__next__()
    
        n = 0
        while n < 6:
            time.sleep(1)   
            print('开始制作第 %s 个包子' % n)
            conn1.send(n)   # 将 n 传入 consumer,由 yield 接收
            print('开始制作第 %s 个包子' % (n+1))
            conn2.send(n+1)
            n += 2
    
    
    if __name__ == '__main__':
        conn1 = consumer('rose')    # 生成生成器对象
        conn2 = consumer('lila')
        produce()   # 执行函数
    
    开始吃包子...
    开始吃包子...
    开始制作第 0 个包子
    rose 开始吃第 0 个包子
    开始吃包子...
    开始制作第 1 个包子
    lila 开始吃第 1 个包子
    开始吃包子...
    开始制作第 2 个包子
    rose 开始吃第 2 个包子
    开始吃包子...
    ...
    

    6.2 Greenlet 模块

    greenlet是一个用 C 实现的协程模块,相比与 python 自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为 generator。

    from greenlet import greenlet
    
    
    def foo():
        print('PHP')
        g2.switch()     # 切换到 g2,即执行 bar()
    
    
    def bar():
        print('Python')
        g1.switch()     # 切换到 g1,即执行 foo()
        print('Java')
    
    if __name__ == '__main__':
        g1 = greenlet(foo)      # 创建greenlet 对象,里面封装要执行的函数
        g2 = greenlet(bar)
    
        # 首先启动 bar()
        g2.switch()
    
    Python
    PHP
    Java
    

    6.3 Gevent 模块

    不管是利用Python自带的 yield 实现协程,还是 greenlet 模块,都需要手动去切换,显然这不是我们想要的结果。

    Gevent 模块,通过 greenlet 实现协程,其基本思想是:

    当一个 greenlet 遇到 IO 操作时,如:访问网络,就自动切换到其他的 greenlet。等到 IO 操作完成,再切换回来继续执行。由于 IO 操作非常耗时,使程序处于等待状态,Gevent 可以给我们自动切换,如此就能保证始终都有 greenlet 在运行,而非在等待 IO。

    模拟 IO 操作,实现自动切换,以下为模拟 IO 操作的几种方法:

    gevent.sleep(2)     # gevent 模拟
    
    # 如果要用 time 模块的 sleep() 方法则需要引入 monkey,否则不能识别
    from gevent import monkey; monkey.patch_all()
    
    
    time.sleep(2)
    
    gevent.spawn(func, *args, **kwargs)
    

    由于切换是在 IO 操作时自动完成,所以 Gevent 需要修改 Python 自带的一些标准库,这一过程在启动时通过 monkey patch 完成:

    import gevent
    from gevent import monkey; monkey.patch_all()
    
    
    def f(n):
        for i in range(n):
            print(gevent.getcurrent(), i)
    
    g1 = gevent.spawn(f, 2)
    g2 = gevent.spawn(f, 3)
    g3 = gevent.spawn(f, 3)
    
    gevent.joinall([
        g1, g2, g3
    ])
    
    <Greenlet "Greenlet-0" at 0x3a12c48: f(2)> 0
    <Greenlet "Greenlet-1" at 0x3a12d48: f(3)> 0
    <Greenlet "Greenlet-2" at 0x3a12e48: f(3)> 0
    <Greenlet "Greenlet-0" at 0x3a12c48: f(2)> 1
    <Greenlet "Greenlet-1" at 0x3a12d48: f(3)> 1
    <Greenlet "Greenlet-2" at 0x3a12e48: f(3)> 1
    <Greenlet "Greenlet-1" at 0x3a12d48: f(3)> 2
    <Greenlet "Greenlet-2" at 0x3a12e48: f(3)> 2
    

    可以看到三个 greenlet 依次运行,而非交替运行。要让 greenlet 交替运行,可以通过 gevent.sleep() 交出控制权:

    def f(n):
        for i in range(n):
            print(gevent.getcurrent(), i)
            gevent.sleep()
    
    <Greenlet "Greenlet-0" at 0x3a12c48: f(2)> 0
    <Greenlet "Greenlet-1" at 0x3a12d48: f(3)> 0
    <Greenlet "Greenlet-2" at 0x3a12e48: f(3)> 0
    <Greenlet "Greenlet-0" at 0x3a12c48: f(2)> 1
    <Greenlet "Greenlet-1" at 0x3a12d48: f(3)> 1
    <Greenlet "Greenlet-2" at 0x3a12e48: f(3)> 1
    <Greenlet "Greenlet-1" at 0x3a12d48: f(3)> 2
    <Greenlet "Greenlet-2" at 0x3a12e48: f(3)> 2
    

    示例1:

    import gevent
    
    
    def foo():
        print('PHP')
        gevent.sleep(2)  # 模拟 IO
    
    
    def bar():
        print('Python')
        gevent.sleep(1)
        print('Java')
    
    
    if __name__ == '__main__':
        g1 = gevent.spawn(foo)
        g2 = gevent.spawn(bar)
    
        gevent.joinall([        # 全部阻塞
            g1,
            g2
        ])
    
    PHP
    Python
    Java
    

    爬虫示例:在实际代码中,不可能由我们自己去通过 gevent.sleep() 去切换协程,而是在执行到 IO 操作时自动切换:

    import gevent
    import requests
    import time
    from gevent import monkey; monkey.patch_all()
    
    
    def get_page(url):
        print('get:', url)
        r = requests.get(url, timeout=30)
        if r.status_code == 200:
            print('%d bytes received from %s' % (len(r.text), url))
    
    
    start_time = time.time()
    
    gevent.joinall([
         gevent.spawn(get_page, 'https://www.python.org/'),
         gevent.spawn(get_page, 'https://www.baidu.com/'),
         gevent.spawn(get_page, 'https://github.com/'),
    ])
    
    stop_time = time.time()
    print('run time is %s' % (stop_time - start_time))
    
    get: https://www.python.org/
    get: https://www.baidu.com/
    get: https://github.com/
    2443 bytes received from https://www.baidu.com/
    79919 bytes received from https://github.com/
    48944 bytes received from https://www.python.org/
    run time is 1.4810848236083984
    

    从结果看,三个网络操作并发执行,而且结束顺序不同,只有一个线程。

    7. 事件驱动模型

    7.1 事件驱动模型介绍

    事件驱动编程思想:一种编程范式

    我们知道协程遇到 IO 操作就会切换,但是什么时候切换回来呢?

    传统编程的线性模式:程序开始执行 ——> 代码 A ——> 代码 B ——> 结束,程序的执行都是由用户自己控制,什么时候该执行,什么时候该结束、切换都是自己控制。

    但是协程内部的切换不可能有我们自己切换,只有当有事件触发时,才发切换回来(如:鼠标点击、键盘输入或其他事件),这就涉及到了事件驱动。

    服务器处理程序事件的几种常用模型:

    • 每收到一个请求,就创建一个新的进程,来处理该请求
    • 每收到一个请求,就创建一个新的线程,来处理该请求
    • 每收到一个请求,放入一个事件列表,让主进程通过非阻塞 IO 方式来处理请求

    第三种就是协程、事件驱动的方式,为大多数网络服务器采用的方式。

    在 UI 编程中,常常要通过鼠标点击来切换某个事件。那么服务器是如何知道、获得用户的鼠标点击呢?两种方式:

    1. 开启线程进行鼠标点击循环检测
      • 缺点也很明显,因为要始终开启线程循环监测是否为鼠标点击,这样会造成 CPU 资源的浪费
      • 通常不止有鼠标点击行为,还有别的事件(如:回车),那么就需要开启多个线程去监测。如果接口是阻塞的,那么还要扫描键盘是否被按下。
    2. 事件驱动模型
      • 目前 UI 编程中广泛使用的编程范式
      • 其内部有个事件(消息)队列,鼠标点击,就会往队列中增加一个点击事件
      • 通过循环,不断从队列中取出事件,根据不同事件,调用不同函数 ( 如:onClick() )
      • 事件一般都有个字保存各自的处理函数指针,每种事件都有独立的函数去处理

    7.2 三种编程范式对比

    常用的三种编程范式:

    • 单线程同步
    • 多线程
    • 事件驱动:程序执行由外部事件决定,内部包含一个事件循环,当外部事件发生是使用回调机制来触发相应处理。

    下图展示了三种模式随着事件的推移,程序所做的工作。这个程序有 3 个任务,每个任务都在等待 IO 操作时阻塞自身,其中灰色框为阻塞所花费时间。

    • 单线程同步中:

    任务按照顺序执行,某个任务遇到 IO 阻塞,其他任务必须等待,只有当 IO 操作完成后才能继续执行。任务之间没有相互依赖,却需要花费大量时间去等待,这样就大大降低了程序运行速度。

    • 多线程中:

    3个线程分别处理一个任务,当某个线程阻塞时,由操作系统来切换到另外一个线程执行,与单线程相比,这种方式显然更有效率。但是多线程模式需要考虑共享数据的安全问题,那么就需要锁机制来保证数据的安全,如操作不当就容易造成未知的 bug。

    • 事件驱动中:

    3个任务在一个线程中交错执行,当要处理 IO 操作时,从事件队列中取出事件,并调用执行该事件。这种方式使得程序最大可能节省了等待事件,不需要开启额外的线程,因为是单线程,也不用担心共享数据安全问题。

    其思路是:注册事件 ——> ——> 触发事件

    8. IO 模型

    协程实现 IO 阻塞自动切换,其原理是 IO 多路复用。

    8.1 IO 模型准备

    要理解 IO 模型需要先理解以下几个知识点:

    用户态和内核态

    现代操作系统采用虚拟存储器,32位系统,它的寻址空间(虚拟存储空间)为4G(2^32)。操作系统的核心是内核,它可以访问受保护的内存空间,也有访问底层硬件的权限。为了保证用户进程不能操作内核,保证内核安全,操作系统将虚拟空间分为内核态和用户态。

    针对 linux 操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。

    进程切换

    操作系统控制进程的挂起,恢复,这种行为称作进程切换 。从一个进程切换到另一个进程需要保证以下几点:

    • 保存处理机上下文,包括程序计数器和其他寄存器。
    • 更新PCB信息。
    • 把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。
    • 选择另一个进程执行,并更新其PCB。
    • 更新内存管理的数据结构。
    • 恢复处理上下文

    这样是很耗费资源的。

    进程阻塞(Block)

    正在执行的进程,因为某些事情的发生(如:请求系统资源失败、IO操作等),则由系统将其阻塞,阻塞状态不占用 CPU 资源,同时它也是进程的一种主动行为。

    文件描述符 fd

    文件描述符(File Descriptor)是一个用于描述文件的引用的抽象化的概念。

    文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

    缓冲 IO

    缓冲 IO 又称为标准 IO,是大多数文件系统的默认 IO 。在 Linux 系统中,数据会先被拷贝到内核态中,再拷贝到用户态中,这种来回的数据拷贝对 CPU 和内存的消耗是很大的。

    8.2 四种常用 IO 模型

    五种 IO 模型:

    • 阻塞(blocking IO)
    • IO和非阻塞(non-blocking IO)
    • 多路复用(IO multiplexing)
    • 异步 IO(asynchronous IO)
    • 信号驱动 IO(signal driven IO):不常用

    8.2.1 阻塞(blocking IO)

    阻塞 IO 最明显的例子就是 socket 编程,服务端没有接收到客户端发来的消息就会一直等待。

    用户进程调用 recvfrom 这个系统调用 ,内核开始 IO 的第一阶段:数据准备 。对网络 IO 来说,很多时候还没收到完整的包,那么内核就需要等待(block)。而另一方面用户进程也是阻塞的(block),只有当内核将数据准备好后,将数据从内核态 copy 到用户态,进程才会继续执行。

    特点:

    • 只有一次系统调用(accept 时),数据同步,两次 block
    • 必须等数据准备好,才能继续执行,耗费时间长,CPU 利用率低

    8.2.2 非阻塞(non-blocking IO)

    可以将 socket 设置为 non-blocking ,其流程如下:

    从图中可以看出,用户进程发出一个系统调用,若内核没有准备好,它不会阻塞进程,而是会返回一个 error。过一段时间后用户进程继续发送系统调用,若此时内核已经准备好数据了,那么就将数据 copy 到用户态中,进程继续执行。

    特点:

    • 循环发出系统调用,检查内核是否准备好数据(轮询)
    • 每次发出调用之间,CPU 执行权限在进程上,可以执行别的操作
    • 系统调用太多,数据更新不及时

    Tips: 数据 copy 的过程,进程是阻塞的

    示例:将 socket 设置为非阻塞状态

    服务端:

    # server.py
    from socket import *
    import time
    
    
    server = socket(AF_INET, SOCK_STREAM)
    server.bind(('127.0.0.1', 8000))
    server.listen(5)
    server.setblocking(False)   # 设置为非阻塞
    
    
    while True:
        try:
            conn, addr = server.accept()
    
            data = conn.recv(1024)
            print(data.decode('utf8'))
    
            conn.close()
        except Exception as e:
            print('%s:%s' % (e, time.ctime()))
            time.sleep(4)
    
    server.close()
    

    客户端:

    # client.py
    from socket import *
    
    
    client = socket(AF_INET, SOCK_STREAM)
    client.connect(('127.0.0.1', 8000))
    
    while True:
        #pass
        client.send('Hello Python'.encode('utf8'))
        break
    
    client.close()
    

    将 socket 设置为非阻塞状态,服务端没有接收到客户端发送的数据时,返回一个错误信息并捕捉打印。每隔 4s后再检查一次客户端是否发送来信息,如此循环往复。

    [WinError 10035] 无法立即完成一个非阻止性套接字操作。:Sat Dec 22 13:13:01 2018
    [WinError 10035] 无法立即完成一个非阻止性套接字操作。:Sat Dec 22 13:13:05 2018
    [WinError 10035] 无法立即完成一个非阻止性套接字操作。:Sat Dec 22 13:13:09 2018
    Hello Python
    [WinError 10035] 无法立即完成一个非阻止性套接字操作。:Sat Dec 22 13:13:13 2018
    [WinError 10035] 无法立即完成一个非阻止性套接字操作。:Sat Dec 22 13:13:17 2018
    

    8.2.3 IO 多路复用(IO multiplexing)

    IO 多路复用也叫做 event driven IO,实现方式有:select、poll、epoll。通过监听多个 IO 对象,当 IO 对象有变化(有数据)的时候就通知用户进程。

    特点

    • 优势在于可以处理多个连接,不适用于单个连接 。
    • 通过一种机制一个进程能同时等待多个文件描述符,这些描述符中任意一个进入就绪状态,select() 函数就可以返回。

    • 当用户进程调用了select 时,那么整个进程会被阻塞(block)
    • 与此同时,内核会监听所有 select 负责的 socket
    • 当任何一个 socket 中的数据准备好了,select 就会返回
    • 此时用户进程再调用 read 操作,将数据 copy 到用户态。

    Python 的 select 模块,提供了:select、poll、epoll 三个方法(不同平台提供不同方法,win 上只有 select 方法,Linux 中提供全部三个),分别调用三个不同的方法实现 IO 多路复用。

    网络操作、文件操作、终端操作等均属于 IO 操作,但 win 只支持 socket 操作,其他系统支持其他 IO 操作。但是也无法检测普通文件操作,自动检测文件是否变化。

    select 方法:

    语法结构:

    # 四个参数,前面三个必须,第四个为超时时间(可选)
    readable, writable, exceptional = select.select(inputs, outputs, inputs, timeout)
    
    # 当 inputs 句柄发生变化,可读时(accept、read),获取句柄,并添加到 readable 列表中
    # 当 outputs 句柄发生变化时,获取该句柄,并添加到 writeable 列表中
    # 当 inputs 句柄发生错误时,获取并添加到 exceptionl中
    # 为设置超时时间,则 select 会一直阻塞,直到监听的句柄发生变化
    # 设置超时时间,这段时间会一直阻塞,若时间过了句柄没有变化,则返回三个空列表,否则返回相应句柄
    

    用户进程调用 select 监听到某个文件描述符(文件描述符即文件句柄,socket、open files、pipes 等所),对应的内核缓冲区有数据,再调用 accpet 和 recv,将数据 copy 到用户态。

    示例:利用 select 实现 socket 并发聊天

    服务端:

    import socket
    import select
    
    
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(('127.0.0.1', 8080))
    sock.listen(5)
    
    inputs = [sock, ]      # 定义监听的套接字对象列表,列表里可以有多个对象
    outputs = []
    
    print('----------------start----------------')
    
    while True:
        # 监听哪个 socket 对象获得,没有客户端连接时会阻塞
        # 监听的inputs中的socket对象内部如果有变化,那么这个对象就会在rlist
        # outputs里有什么对象,wlist中就有什么对象
        # []如果这里的对象内部出错,那会把这些对象加到elist中
        rlist, wlist, elist = select.select(inputs, outputs, [])
    
        print(len(inputs), len(outputs))
    
        # 打印 inputs
        print(inputs)
    
        for obj in rlist:
            # 如果活动对象为 socket,那么将客户端对象加入到监听列表,客户端再发送数据时,触发客户端的对象活动
            if obj == sock:
                conn, addr = obj.accept()
                inputs.append(conn)
    
            else:
                data = obj.recv(1024)
                print(data.decode('utf8'))
                resp = input('回复%s号客户>>>' % (inputs.index(obj)))
                obj.send(resp.encode('utf8'))
    

    客户端:

    from socket import *
    
    
    sock = socket(AF_INET, SOCK_STREAM)
    
    sock.connect(('127.0.0.1', 8080))
    
    while True:
        data = input('>>>').strip()
        sock.send(data.encode('utf8'))
        res = sock.recv(1024)
        print(res.decode('utf8'))
    

    服务端结果:

    ----------------start----------------
    1 0
    [<socket.socket fd=224, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>]
    2 0
    [<socket.socket fd=224, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>, <socket.socket fd=220, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 7150)>]
    hello
    回复1号客户>>>python
    

    第一次循环,inputs 只有服务端的 socket 对象,并将连接的客户端 socket 对象添加到 inputs 列表中。第二次循环时,则执行 else 部分,即与客户端交互。

    客户端结果:

    >>>hello
    python
    

    select、poll 与 epoll 的区别

    • select 采用轮询的方式去监听 socket,最大连接数一般为 1024个,时间复杂度为 O(n)
    • poll 也是采用的轮询方式监听,但没有了个数限制,时间复杂度为 O(n)
    • epoll 连接无限制,每个 socket 对象绑定一个回调函数,当 socket 对象活动则触发回调函数,并把事件活动存入事件列表中,epoll 直接调用事件列表即可。即有变化时主动告知用户进程,时间复杂度为 O(1)

    8.2.4 异步IO(Asynchronous IO)

    用户进程发起系统调用(read操作后),立马去就去做别的事。另一方便,内核收到一个系统调用后,会立刻返回结果(是否准备好数据)。当内核将数据准备完毕并拷贝到用户态后,就会给用户进程发送一个 signal,告诉它 read 操作已经操作完成。

    与其他三种 IO 模型相比,异步 IO 全程几乎无阻塞,这也是效率最高的一种 IO 模型。

    8.2.5 五种IO模型对比

    • 阻塞IO:会一直阻塞进程直至内核将数据拷贝到用户态,同步
    • 非阻塞IO:不断发送系统调用,查看内核是否准备好数据,同步
    • IO多路复用:内核使用select监听socket是否准备好数据(文件句柄变化),一旦准备好就立刻返回,用户进程将调用read操作将数据从内核态拷贝到用户态,同步
    • 异步IO:用户进程向内核发起系统请求后,就会立即返回。内核准备好数据后,则告诉用户进程,全程无阻塞
    • 信号驱动IO:不常用,不做介绍
    • 前面三种都属于同步IO

    非阻塞IO 与异步IO的区别

    非阻塞IO虽然大部分时间不是阻塞的,但它要求进程去主动检查是否准备好。一旦准备好后,会再次调用recvfrom来将数据拷贝到用户态。而异步IO发起系统调用后立马返回,也不用去检查、拷贝数据,相当于完全交给内核去管理。

    8.2.6 selectors 模块

    selectors 模块能根据系统平台,自动选用IO多路复用的方式:

    服务端:

    from socket import *
    import selectors
    
    
    sel = selectors.DefaultSelector()
    
    def accept(sock, mask):
        conn, addr = sock.accept()
        print('accepted', conn, 'from', addr)
        conn.setblocking(False)
        sel.register(conn, selectors.EVENT_READ, read)
    
    
    def read(conn, mask):
        data = conn.recv(1024)
        if data:
            print('echoing', repr(data), 'to', conn)
            conn.send(data)
        else:
            print('closing', conn)
            sel.unregister(conn)
            conn.close()
    
    
    server = socket(AF_INET, SOCK_STREAM)
    server.bind(('127.0.0.1', 8080))
    server.listen(5)
    
    # 设置为非阻塞
    server.setblocking(False)
    
    # sock 对象注册绑定 accept 函数
    sel.register(server, selectors.EVENT_READ, accept)
    
    
    while True:
        # 执行 sel 阻塞监听,当有客户端连接,激活 socket 对象,返回一个存放活动 server 对象相关信息的列表
        events = sel.select()
    
        print(events, type(events))     # events: selector 对象,type:list
    
        for key, mask in events:
            print(mask)         # 1
            print(key.data)     # <function accept at 0x00000000020C4378>
            print(key.fileobj)  # key.fileobj:server 对象
    
            callback = key.data         
    
            callback(key.fileobj, mask)     # accept()/read()
    

    客户端:

    from socket import *
    
    
    client = socket(AF_INET, SOCK_STREAM)
    client.connect(('127.0.0.1', 8080))
    
    while True:
        data = input('>>>').strip()
        client.send(data.encode('utf8'))
        res = client.recv(1024)
        print(res.decode('utf8'))
    
    [(SelectorKey(fileobj=<socket.socket fd=236, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>, fd=236, events=1, data=<function accept at 0x00000000020C4378>), 1)] <class 'list'>
    1
    <function accept at 0x00000000020C4378>
    <socket.socket fd=236, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>
    accepted <socket.socket fd=228, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 8744)> from ('127.0.0.1', 8744)
    [(SelectorKey(fileobj=<socket.socket fd=228, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 8744)>, fd=228, events=1, data=<function read at 0x00000000020C4510>), 1)] <class 'list'>
    1
    <function read at 0x00000000020C4510>
    <socket.socket fd=228, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 8744)>
    echoing b'hello' to <socket.socket fd=228, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1
    

    参考博客

    python网络编程——IO多路复用之select

    IO模式和IO多路复用

  • 相关阅读:
    HttpServletRequest和HttpServletResponse
    JavaWeb核心之Servlet
    XML,,DTD
    类加载器,,,,,反射,,,,,泛型擦除,,,,, 反射配置文件
    TCP通信(单线程多线程)
    UDP与TCP协议
    多线程和线程池 和线程安全
    python os.environ 读取和设置环境变量
    Python+Pytest+Allure+Git+Jenkins接口自动化框架(纯干货)
    idea解决activiti流程图bpmn文件乱码问题
  • 原文地址:https://www.cnblogs.com/midworld/p/14614640.html
Copyright © 2011-2022 走看看