1.线程回调
在线程池/进程池
每次提交任务,都会返回一个表示任务的对象,Future对象
Future对象具备一个绑定方法,add_done_callback 用于指定回调函数
add 意味着可以添加多个回调函数
如果直接使用Thread的话,如何完成回调
from threading import Thread import time def call_back(res): print('任务结果拿到了:%s' % res) def parser(res): print('任务结果拿到了:%s' % res) def task(parser): print('run') time.sleep(1) res = 100 # 表示任务结果 parser(res) # 执行回调函数,并传入任务结果 t = Thread(target=task, args=(call_back,)) # 在这里指定parser也可以 t.start() print('over')
2.线程中的队列
from queue import Queue,LifoQueue,PriorityQueue # 与进程中的Joinablequeue使用方法一模一样,但是不具备IPC # Queue 队列 # q = Queue() # # # 可以往里面添加数据 # q.put('123') # q.put('456') # # # 可以获取数据 # print(q.get()) # print(q.get()) # # # 获取值,设定取值时间,超时报错 # # print(q.get(block=True,timeout=3)) # # # 告诉队列取值已经处理完毕 # q.task_done() # q.task_done() # # 等待队列为空后结束队列 # q.join() # 输出结果 # 123 # 456 # LifoQueue,翻译为last in first out 后进先出,先进后出,模拟堆栈的模式 # 除了与Queue的队列不一样之外,其他的都一样 # lq = LifoQueue() # # lq.put('123') # lq.put('456') # # print(lq.get()) # print(lq.get()) # 输出结果 # 456 # 123 # 具备优先级的队列 # PriorityQueue # 可以存储一个可以比较大小的对象,对象越小,优先级越高,自定义对象不能使用比较运算符,所以不能存储 # q = PriorityQueue() # # q.put('b') # q.put('a') # # print(q.get()) # 会优先得到a
3.事件 Event()
了解Event之前,我们先了解一个案例:
from threading import Thread, Event import time # # is_running = False # def boot_server(): # global is_running # print('正在启动服务器。。。') # time.sleep(3) # print('服务器启动成功!') # is_running = True # # # def connect_server(): # while True: # if is_running: # print('连接服务器成功!') # break # else: # time.sleep(0.5) # print('error,服务器未启动') # # # t1 = Thread(target=boot_server) # t1.start() # # t2 = Thread(target=connect_server) # t2.start()
案例:比如说我们做一个客户端连接服务器的模型,服务器与客户端同时启动 而客户端需要访问服务器,
而服务器启动需要时间,但是两个端口却是同时开启 所以客户端不能及时访问到服务器。
Event方法
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法wait、clear、set
事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么执行event.wait 方法时便不再阻塞。
-
clear:将“Flag”设置为False
-
set:将“Flag”设置为True
用 threading.Event 实现线程间通信,使用threading.Event可以使一个线程等待其他线程的通知,我们把这个Event传递到线程对象中,
Event默认内置了一个标志,初始值为False。一旦该线程通过wait()方法进入等待状态,直到另一个线程调用该Event的set()方法将内置标志设置为True时,该Event会通知所有等待状态的线程恢复运行。
使用Event的案例:
from threading import Thread, Event import time # 而使用Event方法就可以解决这种问题 boot_event = Event() # boot_event.clear() 恢复事件的状态为False # boot_event.is_set() 返回事件的状态 # boot_event.wait() 等待事件发生,就是等待事件被设置为True # boot_event.set() 设置事件为True def boot_server(): print('正在启动服务器。。。') time.sleep(3) print('服务器启动成功') boot_event.set() # 标记事件已经发生了,将状态设置为True def connect_server(): boot_event.wait() # 等待事件发生,如果状态成为了True,会执行下面的代码 print('连接服务器成功!') t1 = Thread(target=boot_server) t1.start() t2 = Thread(target=connect_server) t2.start()
引子
上一节中我们知道GIL锁将导致CPython无法利用多核CPU的优势,只能使用单核并发的执行。很明显效率不高,那有什么办法能够提高效率呢?
效率要高只有一个方法就是让这个当前线程尽可能多的占用CPU时间,如何做到?
任务类型可以分为两种 IO密集型 和 计算密集型
对于计算密集型任务而言 ,无需任何操作就能一直占用CPU直到超时为止,没有任何办法能够提高计算密集任务的效率,除非把GIL锁拿掉,让多核CPU并行执行。
对于IO密集型任务任务,一旦线程遇到了IO操作CPU就会立马切换到其他线程,而至于切换到哪个线程,应用程序是无法控制的,这样就导致了效率降低。
如何能提升效率呢?想一想如果可以监测到线程的IO操作时,应用程序自发的切换到其他的计算任务,是不是就可以留住CPU?的确如此
单线程实现并发这句话乍一听好像在瞎说
首先需要明确并发的定义
并发:指的是多个任务同时发生,看起来好像是同时都在进行
并行:指的是多个任务真正的同时进行
早期的计算机只有一个CPU,既然CPU可以切换线程来实现并发,那么为何不能再线程中切换任务来并发呢?
上面的引子中提到,如果一个线程能够检测IO操作并且将其设置为非阻塞,并自动切换到其他任务就可以提高CPU的利用率,指的就是在单线程下实现并发。
如何能够实现并发呢
并发 = 切换任务+保存状态,只要找到一种方案,能够在两个任务之间切换执行并且保存状态,那就可以实现单线程并发
python中的生成器就具备这样一个特点,每次调用next都会回到生成器函数中执行代码,这意味着任务之间可以切换,并且是基于上一次运行的结果,这意味着生成器会自动保存执行状态!
案例:yiled实现并发效果
# 使用生成器来实现单线并发多个任务 import time def func1(): a = 1 for i in range(1000000): a += 1 print('a run') time.sleep(3) yield def func2(): res = func1() a = 1 for i in range(1000000): a += 1 print('b run') next(res) st = time.time() func2() print(time.time() - st) ''' 对于纯计算的任务而言,单线程并发反而使执行效率下降了一半左右。所以这样的方案对于纯计算任务而言是没有必要的 我们暂且不考虑这样的并发对程序的好处是什么,在上述代码中。使用yield来切换代码结构非常混乱,如果任务太多, 而且每个都需要切换的话,那么会大大的增加时间。降低了效率,因此就有人专门对yield进行了封装,于是便有了greenlet模块 '''
2.greenlet模块
''' 一个'greenlet’是一个小型的独立伪线程,可以把它想象成一些栈帧,栈是初始调用的函数,而栈顶是当前'greenlet'的暂停位置 你使用'greenlet' 创建一堆这样的堆栈,然后在他们之间跳转执行。跳转必须显式声明的:一个greenlet必须选择要跳转到的另一个greenlet, 这会让前一个挂起,而后一个在此前挂起处恢复执行。不同greenlets之间的跳转称为切换(switching) 。 当你创建一个greenlet时,它得到一个开始时为空的栈;当你第一次切换到它时,它会执行指定的函数,这个函数可能会调用其他函数、切换跳出greenlet等等。 最终栈底的函数执行结束出栈时,这个greenlet的栈又变成空的,这个greenlet也就死掉了。greenlet也会因为一个未捕捉的异常死掉。 ''' import greenlet import time def task1(): print('task1,run') g2.switch() # 切换任务至g2 print('task1 over') g2.switch() # 切换任务至g2 def task2(): print('task2,run') g1.switch() # 切换任务至g1 time.sleep(2) print('task2,over') g1 = greenlet.greenlet(task1) g2 = greenlet.greenlet(task2) g1.switch() print('主 over') ''' 该模块简化了yield复杂的代码结构,实现了单线程下多任务并发,但是无论直接使用yield还是greenlet都不能检测到IO操作 如果遇到IO时同样进入阻塞状态,所以此时的并发是没有任何意义的 因此就延申出了gevent模块,既能检测IO,又能实现单线程并发,注意的是gevent模块自身无法检测IO '''
协程·
需要强调的是:
1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
对比操作系统控制线程的切换,用户在单线程内控制协程的切换
优点如下:
1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
2. 单线程内就可以实现并发的效果,最大限度地利用cpu
缺点如下:
1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程来尽可能提高效率
2. 协程本质是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
1.gevent模块 协程的使用
import time ''' gevent 不具备检测IO的能力,需要为它打补丁,打上补丁之后就能检测IO 注意补丁一定需要打在最上面,必须保证导入模块前就打好补丁 ''' from gevent import monkey monkey.patch_all() from threading import current_thread import gevent def task1(): # print(current_thread(), 1) print('task1,run') time.sleep(3) print('task1 over') def taks2(): # print(current_thread(), 2) print('task2 run') print('task2 over') # spawn 用于创建一个协程任务 g1 = gevent.spawn(task1) g2 = gevent.spawn(taks2) # 任务要执行,必须保证主线程没挂,因为所有协程任务都是主线在执行 # 必须调用join来等待协程任务,理论上等待执行时间最长的任务, # 但是我们在执行过程中并不知道那个任务执行的时间最长,所有全部join # 这里有一个方法可以全部join, gevent.joinall([g1,g2]) print('over')
monkey () 补丁
在Python语言中,monkey patch
指的是对于一个类或者模块所进行的动态修改
2.自定义补丁练习
import json # 自定义json补丁 def dumps(obj): print('这是打完补丁后的dumps函数 哈哈哈哈!') def loads(json_str): print('这是打完补丁后的loads函数 嘻嘻嘻嘻!') def patch_json(): json.dumps = dumps json.loads = loads # 打补丁 patch_json() # 再次调用会执行覆盖的dumps与loads方法 json.dumps('123123') # 输出结果 '这是打完补丁后的dumps函数 哈哈哈哈!' json.loads('123321') # 输出结果 '这是打完补丁后的loads函数 嘻嘻嘻嘻!’