预先知道:
1.并发:是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机(CPU)上运行,
但任一个时刻点上只有一个程序在处理机上运行。
2.并行:是指任何时间点,有多个程序运行在多个CPU上(最多和CPU数量一致)
3.同步:是指代码调用IO操作时,必须等待IO操作完成才能返回的调用方式。
4.异步:是指代码调用IO操作时,不必等待IO操作完成就能返回的调用方式。
5.阻塞:是指调用函数的时候当前线程被挂起。
6.非阻塞:是指调用函数的时候当前线程不会被挂起,而是立即返回。
unix下的5大io类型
1阻塞I式/O:系统调用不会立即返回结果,当前线程会阻塞,等到获得结果或报错时在返回(问题:如在调用send()的同时,线程将被阻塞,
在此期间,线程将无法执行任何运算或响应任何的网络请求。) 2非阻塞式I/O:调用后立即返回结果(问题:不一定三次握手成功,recv() 会被循环调用,循环调用recv()将大幅度推高CPU 占用率),
做计算任务或者再次发起其他连接就较有优势 3I/O复用:它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。
(阻塞式的方法,可以监听多个socket状态)(问题:将数据从内核复制到用户空间的时间不能省) 5异步I/O:它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,
也不需要主动的去拷贝数据。
通过非阻塞io实现http请求:
import socket from urllib.parse import urlparse def get_url(url): #通过socket请求html url=urlparse(url) host=url.netloc path=url.path if path=="": path="/" #建立socket连接 client=socket.socket(socket.AF_INET,socket.SOCK_STREAM) #设置成非阻塞(抛异常:BlockingIOError: [WinError 10035] 无法立即完成一个非阻止性套接字操作。) client.setblocking(False) try: client.connect((host,80)) except BlockingIOError as e: pass #向服务器发送数据(还未连接会抛异常) while True: try: client.send("GET {} HTTP/1.1 Host:{} Connection:close ".format(path, host).encode("utf8")) break except OSError as e: pass #将数据读取完 data=b"" while True: try: d=client.recv(1024) except BlockingIOError as e: continue if d: data+=d else: break #会将header信息作为返回字符串 data=data.decode('utf8') print(data.split(' ')[1]) client.close() if __name__=='__main__': get_url('http://www.baidu.com')
通过select完成http请求(利用循环回调):
优点:并发性高(驱动整个程序主要是回调循环loop()函数实现,不会等待。没有线程的切换,只有一个线程,党一个URL连接建立完成后就会注册,然后进入执行)
#自动根据环境选择poll和epoll from selectors import DefaultSelector,EVENT_READ,EVENT_WRITE selector=DefaultSelector() urls=[] #全局变量 stop=False class Fetcher: def connected(self, key): #取消注册 selector.unregister(key.fd) self.client.send("GET {} HTTP/1.1 Host:{} Connection:close ".format(self.path, self.host).encode("utf8")) selector.register(self.client.fileno(),EVENT_READ,self.readable) def readable(self,key): d = self.client.recv(1024) if d: self.data += d else: selector.unregister(key.fd) # 会将header信息作为返回字符串 data = self.data.decode('utf8') print(data.split(' ')[1]) self.client.close() urls.remove(self.spider_url) if not urls: global stop stop=True def get_url(self,url): self.spider_url = url url = urlparse(url) self.host = url.netloc self.path = url.path self.data = b"" if self.path == "": self.path = "/" # 建立socket连接 self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client.setblocking(False) try: self.client.connect((self.host, 80)) except BlockingIOError as e: pass #注册写事件,及回调函数 selector.register(self.client.fileno(),EVENT_WRITE,self.connected) def loop(): #回调+事件循环+select(poll/epoll) #事件循环,不停的调用socket的状态并调用对应的回调函数 #判断哪个可读可写,select本身不支持register模式 #socket状态变化后的回调使用程序员完成的 if not stop: while True: ready=selector.select() for key,mask in ready: call_back=key.data call_back(key) if __name__=='__main__': fetcher=Fetcher() fetcher.get_url('http://www.baidu.com') loop()
协程:不带返回值的函数调用,是一个可以暂停的函数。
解决方案:采用同步的方式编写异步的代码;采用单线程去解决任务。
协程的调度:时间循环+协程模式
#生成器是可以暂停的函数 import inspect # def gen_func(): # value=yield from # #第一返回值给调用方, 第二调用方通过send方式返回值给gen # return "bobby" #1. 用同步的方式编写异步的代码, 在适当的时候暂停函数并在适当的时候启动函数 import socket def get_socket_data(): yield 1 def downloader(url): client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.setblocking(False) try: client.connect((host, 80)) # 阻塞不会消耗cpu except BlockingIOError as e: pass selector.register(self.client.fileno(), EVENT_WRITE, self.connected) #如果get_socket_data()中出现异常,会直接抛给downloader(向上抛) source = yield from get_socket_data() data = source.decode("utf8") html_data = data.split(" ")[1] print(html_data) def download_html(html): html = yield from downloader() if __name__ == "__main__": #协程的调度依然是 事件循环+协程模式 ,协程是单线程模式 pass