同步与异步
-
同步与异步
-
阻塞与非阻塞
程序在运行中表现的状态分为三种:阻塞、运行、就绪
阻塞:程序遇到IO阻塞。程序遇到IO阻塞会立马挂起,CPU马上切换,等到IO结束之后,再执行。
非阻塞:程序没有IO或者遇到IO通过某种手段让CPU去执行其他任务,尽可能的占用CPU。
-
同步与异步
从线程发布任务的角度来讲:
同步:任务发出去之后,等待,直到这个任务最终结束之后,给我一个返回值,在发布下一个任务。
from concurrent.futures import ProcessPoolExecutor import time def task(): print('task is running') time.sleep(1) ruturn f'task is finish' if __name__ == '__main__': p = ProcessPoolExecutor(4) for i in range(8): obj = p.submit(task) print(obj.result()) # 发出一个任务后,直到得到返回值才发布下一个任务 # task is running # task is finish # ...
异步:所有任务同时发出,直接执行下一行
from concurrent.futures import ProcessPoolExecutor import time def task(): print('task is running') time.sleep(1) return f'task is finish' if __name__ == '__main__': p = ProcessPoolExecutor(4) obj_list = [] for i in range(8): obj = p.submit(task) # 发出任务后直接执行下一行 obj_list.append(obj) time.sleep(4) # 等待所有所有任务执行完毕 p.shutdown(wait=True) # 1.阻止再向进程池投放新任务;2.wait=True十个任务是10,一个任务完成-1,直到为0,执行下一行,与join相似。 for i in obj_list: print(i.result()) # 异步回收任务的方式一:将所有任务结果统一回收 # task is running # ... # task is running # task is finish # ...
以上看出:
同步发布任务:先把第一个任务交给进程,等到第一个进程完成后,再将第二个任务交给交给下一个进程。
异步发布任务:直接将10个任务发布给4个进程,不管结果如何,直接执行下一行代码。
-
-
异步+调用机制
以爬虫为例。
爬虫:利用requests模块功能模拟浏览器封装头,给服务器发送一个请求,骗过服务器之后,服务器反回一个文件。拿到文件,进行数据清洗,获取想要的信息。
分两步:1.爬取服务器文件(IO阻塞)。2.拿到文件进行数据分析(非IO或IO极少)。
第一种:利用多进程并发爬取多个网页
from concurrent.futures import ProcessPoolExecutor import requests import time import os import random def get(url): response = requests.get(url) print(f'{os.getpid()} 正在爬取{url}') time.sleep(random.randint(1,3)) if response.status_code == 200: return response.text def parse(text): print(f'{os.getpid()} 分析结果:{len(text)}') if __name__ == '__main__': url_list = [ 'http://www.taobao.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.baidu.com', 'https://www.sohu.com', 'https://www.youku.com', 'http://www.sina.com.cn', 'https://www.sohu.com', 'https://www.youku.com', ] pool = ProcessPoolExecutor(4) obj_list = [] for url in url_list: obj = pool.submit(get,url) obj_list.append(obj) pool.shutdown(wait=True) for obj in obj_list: parse(obj.result()) ''' 11256 正在爬取:http://www.JD.com 13648 正在爬取:http://www.JD.com 12452 正在爬取:http://www.JD.com 7984 正在爬取:http://www.taobao.com 11256 正在爬取:http://www.baidu.com 12452 正在爬取:https://www.sohu.com 11256 正在爬取:https://www.youku.com 7984 正在爬取:http://www.sina.com.cn 13648 正在爬取:https://www.sohu.com 12452 正在爬取:https://www.youku.com 1268 分析结果:141787 1268 分析结果:83871 1268 分析结果:83871 1268 分析结果:83871 1268 分析结果:2381 1268 分析结果:182306 1268 分析结果:714531 1268 分析结果:571607 1268 分析结果:182306 1268 分析结果:714860 '''
此方法存在问题:1.分析结果过程是串行,效率低。2.只有等所有结果全部爬取完之后,在进行分析。
第二种:异步发出10个爬取网页+分析的任务,然后4个进程并发(并行)的完成4个爬取网页+分析任务然后谁先完成,谁先进行爬取网页+分析任务,直到10个爬取+分析任务全部完成完成。
from concurrent.futures import ProcessPoolExecutor import requests import time import os import random def get(url): response = requests.get(url) print(f'{os.getpid()} 正在爬取{url}') time.sleep(random.randint(1,3)) if response.status_code == 200: parse(response.text) # 在爬取函数中直接调用分析函数,就相当于在执行爬取任务之后立马执行分析,与此同时其他三个进程也在进行相同的步骤,相当于四个进程并发(并行)的执行,效率明显提高。 def parse(text): print(f'{os.getpid()} 分析结果:{len(text)}') if __name__ == '__main__': url_list = [ 'http://www.taobao.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.baidu.com', 'https://www.sohu.com', 'https://www.youku.com', 'http://www.sina.com.cn', 'https://www.sohu.com', 'https://www.youku.com', ] pool = ProcessPoolExecutor(4) for url in url_list: obj = pool.submit(get,url) ''' 13764 正在爬取http://www.JD.com 6024 正在爬取http://www.JD.com 408 正在爬取http://www.JD.com 13684 正在爬取http://www.taobao.com 6024 分析结果:83827 6024 正在爬取http://www.baidu.com 13764 分析结果:83827 6024 分析结果:2381 13764 正在爬取https://www.sohu.com 6024 正在爬取https://www.youku.com 408 分析结果:83827 13764 分析结果:182298 408 正在爬取http://www.sina.com.cn 13764 正在爬取https://www.sohu.com 13684 分析结果:141787 13684 正在爬取https://www.youku.com 408 分析结果:571491 6024 分析结果:712405 13764 分析结果:182298 13684 分析结果:712349 '''
虽然这样解决了以上两个问题,但这样两个任务有耦合性,对其进行解耦,就要用到回调函数。
第三种:
from concurrent.futures import ProcessPoolExecutor import requests import time import os import random def get(url): response = requests.get(url) print(f'{os.getpid()} 正在爬取{url}') time.sleep(random.randint(1,3)) if response.status_code == 200: return response.text def parse(obj): print(f'{os.getpid()} 分析结果:{len(obj.result())}') if __name__ == '__main__': url_list = [ 'http://www.taobao.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.JD.com', 'http://www.baidu.com', 'https://www.sohu.com', 'https://www.youku.com', 'http://www.sina.com.cn', 'https://www.sohu.com', 'https://www.youku.com', ] pool = ProcessPoolExecutor(4) for url in url_list: obj = pool.submit(get,url) obj.add_done_callback(parse) # 增加回调函数,发布完任务后直接执行本行,当同一进程执行完第一次任务后,主进程立即对其结果进行分析,不必等所有进程全部执行完再分析,提高了效率 ''' 5480 正在爬取http://www.JD.com 14008 正在爬取http://www.JD.com 12952 正在爬取http://www.JD.com 6924 正在爬取http://www.taobao.com 8588 分析结果:83912 8588 分析结果:83912 5480 正在爬取http://www.baidu.com 14008 正在爬取https://www.sohu.com 8588 分析结果:141787 6924 正在爬取https://www.youku.com 8588 分析结果:2381 5480 正在爬取http://www.sina.com.cn 8588 分析结果:726144 6924 正在爬取https://www.sohu.com 8588 分析结果:83912 8588 分析结果:571495 12952 正在爬取https://www.youku.com 8588 分析结果:182205 8588 分析结果:182205 8588 分析结果:720854 '''
可以看出:现在执行分析任务的是主进程,也就是说子进程执行的还是爬取的任务,拿到返回值后,将结果丢给回调函数add_done_callback,子进程继续下一个爬取任务,同时回调函数分析结果。
其实这样达到的效果就是,在子进程进行爬取任务的同时主进程进行分析,而不必等所有爬取任务都执行完在进行分析,但就分析任务而言其实还是串行的。
因此,当回调函数执行的是IO较多的任务时,并不能提高效率,此时就需要牺牲开销,再开一个进程池。
总结:如果多个任务,多进程多线程处理IO任务:
- 剩下的任务非IO阻塞,使用异步+回调函数
- 剩下的任务IO<<子进程任务的IO,使用异步+回调函数
- 剩下的任务IO>=子进程任务的IO,使用第二种方法或增加一个进程池或线程池
-
线程队列
'queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.' # 当必须在多个线程之间安全地交换信息时,队列在线程编程中特别有用。
-
先进先出型(FIFO)queue
import queue #不需要通过threading模块里面导入,直接import queue就可以了,这是python自带的 #用法基本和我们进程multiprocess中的queue是一样的 q=queue.Queue() q.put('first') q.put('second') q.put('third') # q.put_nowait() #没有数据就报错,可以通过try来搞 print(q.get()) print(q.get()) print(q.get()) # q.get_nowait() #没有数据就报错,可以通过try来搞 ''' 结果(先进先出): first second third '''
-
后进先出型(LIFO)栈
import queue q=queue.LifoQueue() #队列,类似于栈,先进后出的顺序 q.put('first') q.put('second') q.put('third') # q.put_nowait() print(q.get()) print(q.get()) print(q.get()) # q.get_nowait() ''' 结果(后进先出): third second first '''
-
优先级队列
import queue q=queue.PriorityQueue() #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高 q.put((-10,'a')) q.put((-5,'a')) #负数也可以 # q.put((20,'ws')) #如果两个值的优先级一样,那么按照后面的值的acsii码顺序来排序,如果字符串第一个数元素相同,比较第二个元素的acsii码顺序 # q.put((20,'wd')) # q.put((20,{'a':11})) #TypeError: unorderable types: dict() < dict() 不能是字典 # q.put((20,('w',1))) #优先级相同的两个数据,他们后面的值必须是相同的数据类型才能比较,可以是元祖,也是通过元素的ascii码顺序来排序 q.put((20,'b')) q.put((20,'a')) q.put((0,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) ''' 结果(数字越小优先级越高,优先级高的优先出队): '''
-
-
事件Event(线程、进程)
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。
对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件,继续执行。进程同样。
event中的方法如下:
event.isSet()/event.is_set() # 返回event的状态值; event.wait() # 如果 event.isSet()==False将阻塞线程; event.set() # 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度; event.clear() # 恢复event的状态值为False。
简单用法事例:
import time from threading import Thread from threading import current_thread from threading import Event event = Event() def task(): print(f'{current_thread().name} 检测服务器是否正常开启') time.sleep(3) event.set() # 更改event状态为True def task1(): print(f'{current_thread().name} 正在尝试连接服务器') event.wait() # 轮询检测event是否为True,当为True执行下一行否则阻塞 #event.wait(2) # 可设置等待时间,当超过两秒时自动执行下一行 print(f'{current_thread().name} 连接成功') if __name__ == '__main__': t1 = Thread(target=task1) t2 = Thread(target=task1) t3 = Thread(target=task1) t = Thread(target=task) t.start() t1.start() t2.start() t3.start() # Thread-4 检测服务器是否正常开启.... # Thread-1 正在尝试连接服务器 # Thread-2 正在尝试连接服务器 # Thread-3 正在尝试连接服务器 # Thread-2 连接成功 # Thread-1 连接成功 # Thread-3 连接成功
可对上述案例稍作修改使其具有计时器功能:
from threading import Thread,Event import threading import time,random def conn_mysql(): count = 1 while not event.is_set():# 初始值event.is_set()为False,由于三条线程并发执行,此时check也在执行,当执行第二次的时候check可能已经执行完,此时event.is_set()已经成了True if count > 3: raise TimeoutError('连接超时') print(f'{threading.current_thread().name}第{count}次尝试连接') event.wait(1) # 等待1秒,不管event.is_set()结果是否为True都执行下一行 count += 1 print(f'{threading.current_thread().name}连接成功') def check_mysql(): print(f'33[45m{threading.current_thread().name}正在检查mysql33[0m') time.sleep(random.randint(1,2)) event.set() # 将event值设为True if __name__ == '__main__': event = Event() conn1 = Thread(target=conn_mysql) conn2 = Thread(target=conn_mysql) check = Thread(target=check_mysql) conn1.start() conn2.start() check.start() # Thread-1第1次尝试连接 # Thread-2第1次尝试连接 # Thread-3正在检查mysql # Thread-1第2次尝试连接 # Thread-2第2次尝试连接 # Thread-1连接成功 # Thread-2连接成功
模拟红绿灯案例:
'''标志位设定,代表绿灯,直接通行;标志位被清空,代表红灯;wait()等待变绿灯''' import threading,time event=threading.Event() def lighter(): '''0<count<5为绿灯,5<count<10为红灯,count>10重置标志位''' event.set() # count=0 while True: if count>5 and count<10: event.clear() print("33[1;41m red light is on 33[0m") elif count>10: event.set() count=0 else: print("33[1;42m green light is on 33[0m") time.sleep(1) count+=1 def car(name): '''红灯停,绿灯行''' while True: if event.is_set(): print("[%s] is running..."%name) time.sleep(3) else: print("[%s] sees red light,need to wait three seconds"%name) event.wait() print("33[1;34;40m green light is on,[%s]start going 33[0m"%name) light=threading.Thread(target=lighter,) light.start() car1=threading.Thread(target=car,args=("Xiaoxiong",)) car1.start()
-
协程
-
问题提出:并发的本质:1.CPU(遇到IO或单个线程执行时间过长)在多个线程间来回切换。2.将正在执行的线程挂起,保存它的运行状态,等到IO结束再继续执行。
在线程间切换CPU是由操作系统控制的,而操作系统切换的开销是比较大的(开关线程,创建寄存器、堆栈等,在他们之间进行切换),而且由于我们的电脑同一时间运行的程序较多,并不能保证CPU一定在我们指定的这几个线程间切换,这样就降低了效率。
-
解决方法:有没有一种方法可以让CPU在遇到阻塞的时候可以不通过操作系统来切换CPU,而是直接让CPU在这个线程中的多个任务间来回切换,这样就确保了,在一个线程之内一个任务遇到IO,CPU还会继续执行本线程内的其他任务,从而提高效率。
-
协程:协程其实是另一种线程,只是它可以在单线程下,当一个任务遇到IO阻塞就让CPU切换到另一个任务去执行,以此来提上效率。
# 协程的工作原理: # 协程就是告诉Cpython解释器,你不是nb吗,不是搞了个GIL锁吗,那好,我就自己搞成一个线程让你去执行,省去你切换线程的时间,我自己切换比你切换要快很多,避免了很多的开销,对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另外一个任务去计算,这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被cpu执行的状态,相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,从而可以迷惑操作系统,让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给我们的线程。
-
定义
协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。
需要强调的是:
#1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行) #2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
-
优点:1.协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,更加轻量级;2.单线程内就可以实现并发的效果,最大限度利用CPU
-
缺点:1.协程的本质是单线程下,不能利用多核,但是可以一个程序开启多进程,每个进程内开启多线程,每个线程内开启进程;2.一旦协程出现阻塞(协程内的多个任务同时阻塞)将会阻塞整个线程。
-
总结:
- 在一个单线程例实现并发
- 修改共享数据不许加锁
- 用户程序里自己保存多个控制流的上下文栈
- 一个协程遇到IO操作自动切换到其他的协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))
-
-
Greenlet
使用greenlet模块可以比使用yield生成器更简单实现单线程内多任务之间的切换。
from greenlet import greenlet def eat(name): print('%s eat 1' %name) #2 g2.switch('taibai') #3 遇到switch切换 print('%s eat 2' %name) #6 g2.switch() #7 def play(name): print('%s play 1' %name) #4 g1.switch() #5 print('%s play 2' %name) #8 g1=greenlet(eat) g2=greenlet(play) g1.switch('taibai')#可以在第一次switch时传入参数,以后都不需要 1
greenlet只是提供了一种比yield生成器更加便捷的切换方式,当切到一个任务执行时如果遇到io,那就原地阻塞,仍然是没有解决遇到IO自动切换来提升效率的问题。
而且,单纯的切换(在没有io的情况下或者没有重复开辟内存空间的操作),反而会降低程序的执行速度。
-
Gevent
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
import gevent import time def eat(name): print('%s eat 1' %name) # 1 gevent.sleep(2) print('%s eat 2' %name) def play(name): print('%s play 1' %name) # 2 gevent.sleep(1) print('%s play 2' %name) g1 = gevent.spawn(eat, 'alex') # 创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的,spawn是异步提交任务 g2 = gevent.spawn(play, name='taibai') # g1.join() # g2.join() #或者gevent.joinall([g1,g2]) gevent.joinall([g1,g2]) print('主') # alex eat 1 # taibai play 1 # taibai play 2 # alex eat 2 # 主
虽然遇到IO会自动切换任务,但当遇到time.sleep()这种阻塞时不能识别,此时应该加上from gevent import monkey;monkey.patch_all()如下:
import threading from gevent import monkey monkey.patch_all() # 将你代码中的所有的IO都标识. import gevent # 直接导入即可 import time def eat(): print(f'线程1:{threading.current_thread().getName()}') print('eat food 1') time.sleep(3) # 加上mokey就能够识别到time模块的sleep了 print('eat food 2') def play(): print(f'线程2:{threading.current_thread().getName()}') print('play 1') time.sleep(1) # 来回切换,直到一个I/O的时间结束,这里都是我们个gevent做得,不再是控制不了的操作系统了。 print('play 2') g1=gevent.spawn(eat) g2=gevent.spawn(play) gevent.joinall([g1,g2]) print(f'主:{threading.current_thread().getName()}') # 线程1:DummyThread-1 # eat food 1 # 线程2:DummyThread-2 # play 1 # play 2 # eat food 2 # 主:MainThread
-