python day 15
2019/10/20
学习资料来自老男孩教育
1. IO多路复用
'''
I/O多路复用指:通过一种机制,可以监视多个描述符(文件句柄),一旦某个描述符就绪(一般是读就绪或者写就绪),就能够通知程序进行相应的读写操作。
第一个问题:
server,监听两个端口,
解决方案:
windows通过select(最大1024)来解决。
linux使用select,poll,epoll(异步实现)。
第一阶段:
socket,服务端只能处理一个请求。
第二阶段:
select + socket,伪并发。
a, r_list:既读又写
b,r_list,w_list实现读写分离
第三阶段:
socketserver,真正实现了并发,不是伪并发。
基于select/epoll + socket + 多线程,实现了并发操作。
知识储备:多线程
注意:类的继承
'''
import select
import socket
sk1 = socket.socket()
addr = ('127.0.0.1', 8001)
sk1.bind(addr)
sk1.listen(5)
sk2 = socket.socket()
addr = ('127.0.0.1', 8002)
sk2.bind(addr)
sk2.listen(5)
sk3 = socket.socket()
addr = ('127.0.0.1', 8003)
sk3.bind(addr)
sk3.listen(5)
inputs = [sk1, ]
outputs = []
messages_dict = {}
while True:
# select内部自动监听sk1,sk2,sk3,一旦某个或多个文件描述符(此处指socket对象)发生变化,就会感知到
# 如果sk1的accept方法有返回值,则代表连接成功
# r_list = [sk1],r表示ready for reading,w表示ready for writing,e表示exceptions。
# inputs中哪个元素发生变化了,r_list就包含这个元素,如果第三个参数inputs中发生错误了,则将这个元素从第一个参数inputs中移除。
# 如果有人第一次连接,sk1发生变化.
# select内部自动监听socket对象,一旦socket变换就会感知到
r_list, w_list, e_list = select.select(inputs, outputs, inputs, 1)
# 第一次小明连接时:inputs=[sk1,小明],只有sk1发生变化,所以r_list = [sk1,]
# 第二次小华连接时:inputs = [sk1,小明,小华],又是只有sk1发生变化,所以r_list = [sk1,]
# 当小明给sk1发消息时,此时小明发生了变化,所以r_list=[小明]
print('正在监听的socket对象%s' % len(inputs))
print(r_list)
for sk_or_conn in r_list:
if sk_or_conn == sk1:
# 表示有新用户来连接
conn, addr = sk_or_conn.accept()
inputs.append(conn)
# {'小明':[],'张辉':[]}
messages_dict[conn] = []
else:
# 有老用户发消息了
try:
data = sk_or_conn.recv(1024)
except Exception as e:
# 有异常情况发生,比如用户中断连接
inputs.remove(sk_or_conn)
else:
# 用户正常发消息
data2 = data.decode('utf-8')+'好'
# sk_or_conn.sendall(data2.encode('utf-8'))
messages_dict[sk_or_conn].append(data2)
outputs.append(sk_or_conn)
for conn in w_list:
recv_str = messages_dict[conn][0]
del messages_dict[conn][0]
data3 = input('请输入回复的消息:
>>>').encode('utf-8')
conn.sendall(data3)
outputs.remove(conn)
for conn in e_list:
inputs.remove(conn)
2. socketserver源码分析
#!/usr/bin/env python
# encoding: utf-8
'''
@author: lanxing
@contact: bluestarpin@163.com
@file: socket_server.py
@time: 2019-10-19 12:05
@desc:
'''
import socketserver
class MyServer(socketserver.BaseRequestHandler):
def handle(self):
# self.request,self.client_address,self.server
conn, addr = self.request, self.client_address
conn.sendall('欢迎致电蓝星,请发送消息'.encode('utf-8'))
print('等待数据中')
while True:
ret = conn.recv(1024).decode('utf-8')
print(ret)
if ret == 'q':
break
data = input('请输入发送给%s的消息:
>>>' % addr[0]).strip()
conn.sendall(data.encode('utf-8'))
if __name__ == '__main__':
'''
通过socketserver模块的ThreadingTCPServer类创建一个实例tcp_server。实例必须通过init方法进行构造。
寻找init方法的顺序是ThreadingTCPServer类>>>ThreadingMixIn类>>>TCPServer类>>>BaseServer类.
1. 在TCPServer类找到了init方法,所以先将两个参数('192.168.131.1', 9999,)与MyServer传入TCPServer类的init方法进行实例的构造。
TCPServer类的init方法有三个参数需要传入,server_address, RequestHandlerClass, bind_and_activate=True,第三个参数默认是True。
即 server_address=('192.168.131.1', 9999,), RequestHandlerClass= MyServer。
1.1 执行TCPServer类的init方法时,首先需要执行TCPServer类的父类BaseServer类的init方法。
BaseServer类的init方法需要传入2个位置参数,server_address, RequestHandlerClass,所以将('192.168.131.1', 9999,)与MyServer分别传入。
BaseServer类的init方法执行完毕时,tcp_server实例有下面4个属性:
tcp_server.server_address = ('192.168.131.1', 9999,)
tcp_server.RequestHandlerClass = MyServer
tcp_server.__is_shut_down = threading.Event()
1.1.1 上面这条代码表示通过threading模块的Event类创建的实例赋值给了tcp_server.__is_shut_down
执行Event类的构造方法init之后,tcp_server.__is_shut_down有两个属性:
tcp_server.__is_shut_down._cond = Condition(Lock())
1.1.1.1 上面这条代码,又通过threading模块的Condition类创建了一个实例,并赋值给了tcp_server.__is_shut_down._cond
Condition类init方法只需要传入一个参数,其值默认是lock=None,现在传入了参数Lock()。
1.1.1.1.1 参数Lock()不知道是类还是函数的返回值,Lock是个变量,其值是_allocate_lock,而_allocate_lock=_thread.allocate_lock即
Lock()=_allocate_lock()=_thread.allocate_lock(),最后证明Lock是一个函数名,返回的结果是一个锁对象。
tcp_server.__is_shut_down._flag = False
tcp_server.__shutdown_request = False
1.2 执行完TCPServer类的父类BaseServer类的init方法后,再接着执行:
self.socket = socket.socket(self.address_family,self.socket_type)
其中address_family,socket_type是TCPServer类的类变量,
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM
即tcp_server.socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
1.3 因为bind_and_activate参数未传入TCPServer类的init方法,所以此参数默认是True,当此参数是True时,执行下面的代码。
if bind_and_activate:
try:
self.server_bind()
self.server_activate()
except:
self.server_close()
raise
1.3.1 正常情况下首先执行self.server_bind(),就是执行下面的代码:
if self.allow_reuse_address: # allow_reuse_address 是TCPServer类的类变量,默认是False
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 所以此句代码不会执行。
self.socket.bind(self.server_address) # tcp_server.socket.bind(('192.168.131.1', 9999,))
self.server_address = self.socket.getsockname() # 重新赋值给server_address,getsockname方法返回的就是bind方法绑定的地址
1.3.2 再接着执行self.server_activate(),就是执行下面的代码:
self.socket.listen(self.request_queue_size)
其中request_queue_size为TCPServer类的类变量,为5。
1.3.3 发生异常时执行self.server_close(),即
# self.socket.close()
至此,ThreadingTCPServer类的init构造方法执行完毕,创建的实例tcp_server具有下列属性。
tcp_server.server_address = ('192.168.131.1', 9999,)
tcp_server.RequestHandlerClass = MyServer
tcp_server.__is_shut_down = threading.Event()
tcp_server.__is_shut_down._cond = Condition(Lock())
tcp_server.__is_shut_down._flag = False
tcp_server.__shutdown_request = False
tcp_server.socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
同时,tcp_server.socket执行了下面两个方法:
tcp_server.socket.bind(('192.168.131.1', 9999,))
tcp_server.socket.listen(5)
'''
tcp_server = socketserver.ThreadingTCPServer(('192.168.131.1', 9999,), MyServer)
'''
2. 执行tcp_server对象的serve_forever方法,寻找serve_forever方法的顺序是:
ThreadingTCPServer类>>>ThreadingMixIn类>>>TCPServer类>>>BaseServer类.
在BaseServer类中找到了serve_forever方法。
2.1 执行BaseServer类中的serve_forever方法,就是执行下面代码:
self.__is_shut_down.clear()
try:
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ)
while not self.__shutdown_request:
ready = selector.select(poll_interval)
# bpo-35017: shutdown() called during select(), exit immediately.
if self.__shutdown_request:
break
if ready:
self._handle_request_noblock()
self.service_actions()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()
2.1.1 首先执行self.__is_shut_down.clear(),即tcp_server.__is_shut_down.clear()
也就是执行threading模块的Event类的实例的clear方法。其方法如下:
with self._cond: # 即 with tcp_server.__is_shut_down._cond
self._flag = False # 即 tcp_server.__is_shut_down._flag = False
2.1.2 正常情况下,首先执行:with _ServerSelector() as selector:
_ServerSelector=selectors.SelectSelector,而SelectSelector是selectors模块下的一个类。
通过_ServerSelector()创建实例,就是通过selectors模块下的SelectSelector类创建实例。
执行selectors模块下的SelectSelector类的init方法时:
2.1.2.1 首先需要执行其父类_BaseSelectorImpl的init方法,即下面代码:
# this maps file descriptors to keys
self._fd_to_key = {}
# read-only mapping returned by get_map()
self._map = _SelectorMapping(self)
也就是说 with _ServerSelector() as selector 中的selector对象具有下面2个属性了:
selector._fd_to_key = {}
selector._map = _SelectorMapping(selector)
2.1.2.1.2 上面代码selector._map = _SelectorMapping(self),就是通过_SelectorMapping创建了一个实例
在执行_SelectorMapping类的init方法时,传入一个参数,即selector这个对象自己。此时
selector._map._selector = selector
2.1.2.2 之后执行:
self._readers = set()
self._writers = set()
即_ServerSelector类的实例selector有下面两个属性了:
selector._readers = set()
selector._writers = set()
至此,with _ServerSelector() as selector 此条语句执行完毕,selector对象有下面4个属性:
selector._fd_to_key = {}
selector._map = _SelectorMapping(selector)
selector._map._selector = selector
selector._readers = set()
selector._writers = set()
2.1.3 其次执行:selector.register(self, selectors.EVENT_READ)。即执行selector对象的register方法。
即执行:selector.register(fileobj=self, events=selectors.EVENT_READ, data=None)
其中selectors.EVENT_READ表示selectors模块下的EVENT_READ变量,而EVENT_READ= (1 << 0),即EVENT_READ=int(1*2**0)=1
其中selectors.EVENT_WRITE表示selectors模块下的EVENT_WRITE变量,而EVENT_WRITE= (1 << 1),即EVENT_WRITE=int(1*2**1)=2
2.1.3.1 首先需要执行父类的register方法,并将返回值赋值给变量key。
即_BaseSelectorImpl.register(fileobj=self, events=selectors.EVENT_READ, data=None),也就是执行下面代码:
if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
raise ValueError("Invalid events: {!r}".format(events))
key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
此处SelectorKey=namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])函数的返回值,是Type[tuple]。
上面这句没看懂,导致下面key.fd也看不懂了,应当跟多路复用时的select.select方法差不多。
if key.fd in self._fd_to_key:
raise KeyError("{!r} (FD {}) is already registered".format(fileobj, key.fd))
self._fd_to_key[key.fd] = key
return key
2.1.4 其次执行:ready = selector.select(poll_interval)
即执行selectors模块下的类SelectSelector类的select方法,如下:
if sys.platform == 'win32':
def _select(self, r, w, _, timeout=None):
r, w, x = select.select(r, w, w, timeout)
return r, w + x, []
else:
_select = select.select
def select(self, timeout=None):
timeout = None if timeout is None else max(timeout, 0)
ready = []
try:
r, w, _ = self._select(self._readers, self._writers, [], timeout)
except InterruptedError:
return ready
r = set(r)
w = set(w)
for fd in r | w:
events = 0
if fd in r:
events |= EVENT_READ # 此句是什么意思,没看懂,原来是events要么等于0,或者等于1
if fd in w:
events |= EVENT_WRITE # 按位或的意思,要么等于0,或者等于2
# 原来是events = events | EVENT_WRITE,即events要么等于events或者等于 EVENT_WRITE
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
需要查看别人的解释来理解,看https://www.cnblogs.com/zzzlw/p/9384308.html这篇文章
2.1.5 如果ready不是空列表,代表有人连接或发送消息过来,则执行self._handle_request_noblock(),即执行tcp_server._handle_request_noblock(),即是如下代码:
try:
request, client_address = self.get_request()
except OSError:
return
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
self.shutdown_request(request)
except:
self.shutdown_request(request)
raise
else:
self.shutdown_request(request)
2.1.5.1 首先尝试执行 request, client_address = self.get_request(),即执行
tcp_server.get_request()方法。
tcp_server.get_request在ThreadingTCPServer类,ThreadingMixIn类都没找到,在TCPServer类中找到了,所以是执行TCPServer类中的get_request方法。
tcp_server.get_request()方法的返回值是self.socket.accept(),看到accept就知道了,
request,client_address = conn,addr = self.socket.accept()
2.1.5.2 没有异常时,再接着执行self.verify_request(request, client_address),这就是基类BaseServer的方法了,因为子类都没有这个方法,其返回值是True。
2.1.5.3 再尝试执行self.process_request(request, client_address),这是ThreadinMixIn类的方法。
tcp_server.process_request(request, client_address),即执行如下代码:
"""Start a new thread to process the request."""
t = threading.Thread(target = self.process_request_thread,args = (request, client_address))
# 创建一个线程,线程处理的任务函数名是tcp_server.process_request_thread,任务函数的参数元组是(request, client_address)
t.daemon = self.daemon_threads
# daemon_threads是类变量,其值为False,block_on_close是类变量,其值是True。
if not t.daemon and self.block_on_close:
if self._threads is None:
# _threads是类变量,其值是None
self._threads = []
self._threads.append(t)
t.start()
# t.start()就是执行tcp_server.process_request_thread(request, client_address)
2.1.5.3.1 在创建线程时,接收的任务函数名是tcp_server.process_request_thread,也在ThreadingMixIn类中。
其定义如下:
def process_request_thread(self, request, client_address):
try:
self.finish_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
finally:
self.shutdown_request(request)
2.1.5.3.1.1 尝试执行self.finish_request(request, client_address)时,实际调用的是
self.RequestHandlerClass(request, client_address, self),也就是
tcp_server.MyServer(request, client_address,tcp_server)
也就是通过MyServer类创建了一个对象,而MyServer类并没有init方法,
所以去MyServer类的父类BaseRequestsHandler类寻找init方法,其定义如下:
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
# 因为MyServer类定义了handle方法,所以按照继承顺序,先调用MyServer类的handle方法,
# 而不是BaseRequestsHandler类的handle方法
finally:
self.finish()
# 也就是说直到这一步,才看到MyServer类的handle函数。
# 花了4个小时才算分析完毕,真是折腾人啊
'''
tcp_server.serve_forever()