引子:
因为 有GIL锁,所以cpython无法利用多核CPU的优势,只能使用单核并发的执行,效率明显不高。
对于计算密集型任务而言,无需任何操作就能一直占用cpu到超时,米办法提高效率,除非去锁,让多核CPU并行执行;
对于io密集型任务而言,一旦线程遇到io操作CPU就会且到其他线程,切到的线程不确定,应用程序无法控制,就会降低效率;但是如果一个线程能够检测io操作并将其设置为非阻塞,那不就自动切换到其他任务了吗,也就是在单线程下实现并发
1.单线程实现并发 切换任务+保存状态
1.实现条件:要单线程实现并发得找到一种方案,能够在两个任务之间切换执行并且保存状态。
而在python中生成器就具备这么一个特点那我们不就可以利用生成器来实现并发执行,比如每次调用next都会回到生成器函数中执行代码,这意味着任务间可以切换,并且是基于上一次运行的结果,这意味着生成器会自动保存执行状态。
def task1(): while True: yield print("task1 run") def task2(): g = task1() while True: next(g) print("task2 run") task2()
import time def task1(): a = 0 for i in range(10000000): a += i yield def task2(): g = task1() b = 0 for i in range(10000000): b += 1 next(g) s = time.time() task2() print("并发执行时间",time.time()-s) # 单线程下串行执行两个计算任务 效率反而比并发高 因为并发需要切换和保存 def task1(): a = 0 for i in range(10000000): a += i def task2(): b = 0 for i in range(10000000): b += 1 s = time.time() task1() task2() print("串行执行时间",time.time()-s)
2.greenlet 模块
由于yield切换代码非常混乱,如果任务多,不敢想象,因此有人对yield进行分装,也就有了greenlet
def task1(name): print("%s task1 run1" % name) g2.switch(name) # 切换至任务2 print("task1 run2") g2.switch() # 切换至任务2 def task2(name): print("%s task2 run1" % name) g1.switch() # 切换至任务1 print("task2 run2") g1 = greenlet.greenlet(task1) g2 = greenlet.greenlet(task2) g1.switch("jerry") # 为任务传参数
但是greenlet与yield不能检测io,遇到io同样会阻塞,因此我们需要有一种既可检测io,又可实现单线程并发的方案,于是就有了geven.monkey(补丁)
# gevent 不具备检测IO的能力 需要为它打补丁 打上补丁之后就能检测IO # 注意补丁一定打在最上面 必须保证导入模块前就打好补丁 from gevent import monkey monkey.patch_all() # from threading import current_thread import gevent,time def task1(): print(current_thread(),1) print("task1 run") # gevent.sleep(3) time.sleep(3) print("task1 over") def task2(): print(current_thread(),2) print("task2 run") print("task2 over") # spawn 用于创建一个协程任务 g1 = gevent.spawn(task1) g2 = gevent.spawn(task2) # 任务要执行,必须保证主线程没挂 因为所有协程任务都是主线在执行 ,必须调用join来等待协程任务 # g1.join() # g2.join() # 理论上等待执行时间最长的任务就行 , 但是不清楚谁的时间长 可以全部join gevent.joinall([g1,g2]) print("over")
2.线程中的队列
Queue :与进程中的Joinablequeue 使用方式一模一样 但是 不具备IPC
LifoQueue :last in first out 后进先出 先进 后出 模拟堆栈
PriorityQueue:具备优先级的队列 可以存储一个可以比较大小的对象 比较越小的优先级越高 自定义对象 不能使用比较运算符 所以不能存储
from queue import Queue,LifoQueue,PriorityQueue ===================================================================== q = Queue() # # q.put("123") # q.put("456") # # print(q.get()) # print(q.get()) # # # print(q.get(block=True,timeout=3)) # q.task_done() # q.task_done() # q.join() # print("over") ========================================================================== lq = LifoQueue() # # lq.put("123") # lq.put("456") # # print(lq.get()) # print(lq.get()) ==================================================================== class A(object): def __init__(self,age): self.age = age # def __lt__(self, other): # return self.age < other.age # # def __gt__(self, other): # return self.age > other.age def __eq__(self, other): return self.age == other.age a1 = A(50) a2 = A(50) print(a1 == a2) # print(a1 is a1) # print(pq.get()) # pq = PriorityQueue() # pq.put("a") # pq.put("A") # pq.put("C")
3.协程程的概述
定义:单线程下的并发,又称为微线程,纤程,是一种用户态的轻量级线程,由用户程序自己控制调度的
注:1.python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
2.单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
对比操作系统控制线程的切换,用户在单线程内控制协程的切换
优点如下:
1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
2. 单线程内就可以实现并发的效果,最大限度地利用cpu
缺点如下:
1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程来尽可能提高效率
2. 协程本质是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
import gevent,sys from gevent import monkey # 导入monkey补丁 monkey.patch_all() # 打补丁 import time print(sys.path) def task1(): print("task1 run") # gevent.sleep(3) time.sleep(3) print("task1 over") def task2(): print("task2 run") # gevent.sleep(1) time.sleep(1) print("task2 over") g1 = gevent.spawn(task1) g2 = gevent.spawn(task2) #gevent.joinall([g1,g2]) g1.join() g2.join() # 执行以上代码会发现不会输出任何消息 # 这是因为协程任务都是以异步方式提交,所以主线程会继续往下执行,而一旦执行完最后一行主线程也就结束了, # 导致了协程任务没有来的及执行,所以这时候必须join来让主线程等待协程任务执行完毕 也就是让主线程保持存活 # 后续在使用协程时也需要保证主线程一直存活,如果主线程不会结束也就意味着不需要调用join
需要注意:
1.如果主线程结束了 协程任务也会立即结束。
2.monkey补丁的原理是把原始的阻塞方法替换为修改后的非阻塞方法,即偷梁换柱,来实现IO自动切换
#myjson.py def dump(): print("一个被替换的 dump函数") def load(): print("一个被替换的 load函数") # test.py import myjson import json # 补丁函数 def monkey_pacth_json(): json.dump = myjson.dump json.load = myjson.load # 打补丁 monkey_pacth_json() # 测试 json.dump() json.load() # 输出: # 一个被替换的 dump函数 # 一个被替换的 load函数
4.事件
from threading import Thread,Event import time # is_running = False # # def boot_server(): # global is_running # print("正在启动服务器......") # time.sleep(3) # print("服务器启动成功!") # is_running = True # # # def connect_server(): # while True: # if is_running: # print("链接服务器成功!") # break # else: # time.sleep(0.1) # print("error 服务器未启动!") # # # # t1 = Thread(target=boot_server) # t1.start() # # # t2 = Thread(target=connect_server) # t2.start() boot_event = Event() # boot_event.clear() 回复事件的状态为False # boot_event.is_set() 返回事件的状态 # boot_event.wait()等待事件发生 ,就是等待事件被设置为True # boot_event.set() 设置事件为True def boot_server(): print("正在启动服务器......") time.sleep(3) print("服务器启动成功!") boot_event.set() # 标记事件已经发生了 def connect_server(): boot_event.wait() # 等待事件发生 print("链接服务器成功!") t1 = Thread(target=boot_server) t1.start() t2 = Thread(target=connect_server) t2.start()