一、必备知识
1.socket客户端:
阻塞式客户端:
import socket
#实例化socket对象
sock = socket.socket()
#向服务器发起连接
sock.connect(("43.226.160.17",80)) #连接会阻塞
# 仿HTTP发送请求
data = b"GET / HTTP/1.0
host: dig.chouti.com
"
sock.sendall(data)
#接收服务器响应信息
response = sock.recv(8096) #等待数据回收时也会阻塞
print(response)
#关闭socket连接
sock.close()
伪非阻塞式:
import socket
#实例化socket对象
sock = socket.socket()
#设置非阻塞,不等待连接成功直接执行后面的代码,由于连接的操作不可能立即完成所以
#会报一个BlockingIOError:无法立即完成一个非阻止性套接字操 的错误,因此需要处理异常
sock.setblocking(False)
#向服务器发起连接
try:
sock.connect(("43.226.160.17",80)) #连接会阻塞
except BlockingIOError as e:
print(e)
import time
#虽然会报错,但连接的请求已经发出去,所以连接请求依然生效,
#我们可以使用这段时间进行其它的工作,达到资源的最大利用,这里用sleep代指
time.sleep(5)
# 仿HTTP发送请求
data = b"GET / HTTP/1.0
host: dig.chouti.com
"
#由于没有建立连接,所以这里还发不出消息
try:
sock.sendall(data)
#接收服务器响应信息
response = sock.recv(8096) #等待数据回收时也会阻塞
except Exception as e:
print(e)
print(response)
#关闭socket连接
sock.close()
2.IO多路复用加socket实现非阻塞
伪代码:
import socket
import select
socket_list = []
for url in ["www.baidu.com","www.bing.com"]:
client = socket.socket()
client.setblocking(False)
try:
client.connect((url,80))
except BlockingIOError as e:
print(e)
socket_list.append(client)
#事件循环
while True:
r,w,e = select.select(socket_list,socket_list,[],0.05)
#w代指每一个连接成功的client
for obj in w:
obj.send("GET / HTTP/1.0
host: baidu.com
") #这里host写死,下面的代码会解决
#r代指client有改变,可以收数据了
for obj in r:
response = obj.recv(1024)
print(response)
上边代码中,当对多个网站进行连接或爬取时,每一个单独的Url都不会阻塞,再通过select监测数据的变化,可以及时接收数据,又不会挡住后边Url的爬取工作,实现了简单的非阻塞的目标。
但这只是一段伪代码,接下来根据这个思路实现一个真正的异步非阻塞模块。
二、简单的异步非阻塞模块。
import socket
import select
class Request(object):
"""
封装socket对象,使每次循环时创建的socket对象能对应它的req_info字典,
方便其利用字典拿到对应的host信息等
"""
def __init__(self, sock, info):
self.sock = sock
self.info = info
def fileno(self):
return self.sock.fileno()
class AsyncRequest(object):
def __init__(self):
self.sock_list = []
self.conns = []
def add_request(self, req_info):
"""
创建请求
req_info: {'host': 'www.baidu.com', 'port': 80, 'path': '/'},
:return:
"""
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect((req_info['host'], req_info['port']))
except BlockingIOError as e:
pass
obj = Request(sock, req_info)
self.sock_list.append(obj)
self.conns.append(obj)
def run(self):
"""
开始事件循环,检测:连接成功?数据是否返回?
:return:
"""
while True:
# select.select([socket对象,])
# 可是任何对象,对象一定要有fileno方法,实际上执行的是对象.fileno()
# select.select([request对象,])
r, w, e = select.select(self.sock_list, self.conns, [], 0.05)
# w,是否连接成功
for obj in w:
# 检查obj:request对象
# socket, {'host': 'www.baidu.com', 'port': 80, 'path': '/'},
data = "GET %s http/1.1
host:%s
" % (obj.info['path'], obj.info['host'])
obj.sock.send(data.encode('utf-8'))
#连接成功后从列表中删除此obj对象,避免重复连接
self.conns.remove(obj)
# 数据返回,接收到数据
for obj in r:
response = obj.sock.recv(8096)
#函数名加括号运行对应的回调函数
obj.info['callback'](response)
#相应的为避免重复接收移除已经接收成功的对象
self.sock_list.remove(obj)
# 所有请求已经返回
if not self.sock_list:
break
if __name__ == '__main__':
#指定回调函数,可以在屏幕输出,也可以写入文件、数据库等
def callback_fun1(response):
print(response)
def callback_fun2(response):
pass
# with open ......
url_list = [
{'host': 'www.baidu.com', 'port': 80, 'path': '/', 'callback': callback_fun1},
{'host': 'www.cnblogs.com', 'port': 80, 'path': '/index.html', 'callback': callback_fun2},
{'host': 'www.bing.com', 'port': 80, 'path': '/', 'callback': callback_fun1},
]
obj = AsyncRequest()
for item in url_list:
obj.add_request(item)
obj.run()