zoukankan      html  css  js  c++  java
  • python day 15: IO多路复用,socketserver源码培析,

    python day 15

    2019/10/20

    学习资料来自老男孩教育

    1. IO多路复用

    '''
    I/O多路复用指:通过一种机制,可以监视多个描述符(文件句柄),一旦某个描述符就绪(一般是读就绪或者写就绪),就能够通知程序进行相应的读写操作。
    第一个问题:
        server,监听两个端口,
        解决方案:
            windows通过select(最大1024)来解决。
            linux使用select,poll,epoll(异步实现)。
    
    第一阶段:
        socket,服务端只能处理一个请求。
    
    第二阶段:
        select + socket,伪并发。
        a, r_list:既读又写
        b,r_list,w_list实现读写分离
    
    第三阶段:
        socketserver,真正实现了并发,不是伪并发。
            基于select/epoll + socket + 多线程,实现了并发操作。
    
    知识储备:多线程
    
    注意:类的继承
    '''
    
    import select
    import socket
    
    sk1 = socket.socket()
    addr = ('127.0.0.1', 8001)
    sk1.bind(addr)
    sk1.listen(5)
    
    sk2 = socket.socket()
    addr = ('127.0.0.1', 8002)
    sk2.bind(addr)
    sk2.listen(5)
    
    sk3 = socket.socket()
    addr = ('127.0.0.1', 8003)
    sk3.bind(addr)
    sk3.listen(5)
    inputs = [sk1, ]
    outputs = []
    messages_dict = {}
    while True:
        # select内部自动监听sk1,sk2,sk3,一旦某个或多个文件描述符(此处指socket对象)发生变化,就会感知到
        # 如果sk1的accept方法有返回值,则代表连接成功
        # r_list = [sk1],r表示ready for reading,w表示ready for writing,e表示exceptions。
        # inputs中哪个元素发生变化了,r_list就包含这个元素,如果第三个参数inputs中发生错误了,则将这个元素从第一个参数inputs中移除。
        # 如果有人第一次连接,sk1发生变化.
        # select内部自动监听socket对象,一旦socket变换就会感知到
        r_list, w_list, e_list = select.select(inputs, outputs, inputs, 1)
        # 第一次小明连接时:inputs=[sk1,小明],只有sk1发生变化,所以r_list = [sk1,]
        # 第二次小华连接时:inputs = [sk1,小明,小华],又是只有sk1发生变化,所以r_list = [sk1,]
        # 当小明给sk1发消息时,此时小明发生了变化,所以r_list=[小明]
        print('正在监听的socket对象%s' % len(inputs))
        print(r_list)
        for sk_or_conn in r_list:
            if sk_or_conn == sk1:
                # 表示有新用户来连接
                conn, addr = sk_or_conn.accept()
                inputs.append(conn)
                # {'小明':[],'张辉':[]}
                messages_dict[conn] = []
            else:
                # 有老用户发消息了
                try:
                    data = sk_or_conn.recv(1024)
                except Exception as e:
                    # 有异常情况发生,比如用户中断连接
                    inputs.remove(sk_or_conn)
                else:
                    # 用户正常发消息
                    data2 = data.decode('utf-8')+'好'
                    # sk_or_conn.sendall(data2.encode('utf-8'))
                    messages_dict[sk_or_conn].append(data2)
                    outputs.append(sk_or_conn)
        for conn in w_list:
            recv_str = messages_dict[conn][0]
            del messages_dict[conn][0]
            data3 = input('请输入回复的消息:
    >>>').encode('utf-8')
            conn.sendall(data3)
            outputs.remove(conn)
        for conn in e_list:
            inputs.remove(conn)
    

    2. socketserver源码分析

    ThreadingTCPServer类的继承关系

    创建ThreadingTCPServer实例

    server_forver方法的实现

    #!/usr/bin/env python
    # encoding: utf-8
    '''
    @author: lanxing
    @contact: bluestarpin@163.com
    @file: socket_server.py
    @time: 2019-10-19 12:05
    @desc:
    '''
    
    import socketserver
    
    
    class MyServer(socketserver.BaseRequestHandler):
        def handle(self):
            # self.request,self.client_address,self.server
            conn, addr = self.request, self.client_address
            conn.sendall('欢迎致电蓝星,请发送消息'.encode('utf-8'))
            print('等待数据中')
            while True:
                ret = conn.recv(1024).decode('utf-8')
                print(ret)
                if ret == 'q':
                    break
                data = input('请输入发送给%s的消息:
    >>>' % addr[0]).strip()
                conn.sendall(data.encode('utf-8'))
    
    
    if __name__ == '__main__':
        '''
        通过socketserver模块的ThreadingTCPServer类创建一个实例tcp_server。实例必须通过init方法进行构造。
        寻找init方法的顺序是ThreadingTCPServer类>>>ThreadingMixIn类>>>TCPServer类>>>BaseServer类.
        1. 在TCPServer类找到了init方法,所以先将两个参数('192.168.131.1', 9999,)与MyServer传入TCPServer类的init方法进行实例的构造。
            TCPServer类的init方法有三个参数需要传入,server_address, RequestHandlerClass, bind_and_activate=True,第三个参数默认是True。
            即 server_address=('192.168.131.1', 9999,), RequestHandlerClass= MyServer。
                1.1 执行TCPServer类的init方法时,首先需要执行TCPServer类的父类BaseServer类的init方法。
                    BaseServer类的init方法需要传入2个位置参数,server_address, RequestHandlerClass,所以将('192.168.131.1', 9999,)与MyServer分别传入。
                    BaseServer类的init方法执行完毕时,tcp_server实例有下面4个属性:
                    tcp_server.server_address = ('192.168.131.1', 9999,)
                    tcp_server.RequestHandlerClass = MyServer
                    tcp_server.__is_shut_down = threading.Event() 
                    1.1.1 上面这条代码表示通过threading模块的Event类创建的实例赋值给了tcp_server.__is_shut_down
                        执行Event类的构造方法init之后,tcp_server.__is_shut_down有两个属性:
                        tcp_server.__is_shut_down._cond = Condition(Lock())
                        1.1.1.1 上面这条代码,又通过threading模块的Condition类创建了一个实例,并赋值给了tcp_server.__is_shut_down._cond
                            Condition类init方法只需要传入一个参数,其值默认是lock=None,现在传入了参数Lock()。
                                1.1.1.1.1 参数Lock()不知道是类还是函数的返回值,Lock是个变量,其值是_allocate_lock,而_allocate_lock=_thread.allocate_lock即
                                    Lock()=_allocate_lock()=_thread.allocate_lock(),最后证明Lock是一个函数名,返回的结果是一个锁对象。    
                        tcp_server.__is_shut_down._flag = False
                    tcp_server.__shutdown_request = False
                1.2 执行完TCPServer类的父类BaseServer类的init方法后,再接着执行:
                    self.socket = socket.socket(self.address_family,self.socket_type)
                        其中address_family,socket_type是TCPServer类的类变量,
                            address_family = socket.AF_INET
                            socket_type = socket.SOCK_STREAM
                        即tcp_server.socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
                1.3 因为bind_and_activate参数未传入TCPServer类的init方法,所以此参数默认是True,当此参数是True时,执行下面的代码。
                    if bind_and_activate:
                        try:
                            self.server_bind()
                            self.server_activate()
                        except:
                            self.server_close() 
                            raise
                    1.3.1 正常情况下首先执行self.server_bind(),就是执行下面的代码:
                        if self.allow_reuse_address: # allow_reuse_address 是TCPServer类的类变量,默认是False
                            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  # 所以此句代码不会执行。
                        self.socket.bind(self.server_address) # tcp_server.socket.bind(('192.168.131.1', 9999,))
                        self.server_address = self.socket.getsockname() # 重新赋值给server_address,getsockname方法返回的就是bind方法绑定的地址        
                    
                    1.3.2 再接着执行self.server_activate(),就是执行下面的代码:
                        self.socket.listen(self.request_queue_size) 
                        其中request_queue_size为TCPServer类的类变量,为5。
                    
                    1.3.3 发生异常时执行self.server_close(),即
                        # self.socket.close()
            至此,ThreadingTCPServer类的init构造方法执行完毕,创建的实例tcp_server具有下列属性。
            
                tcp_server.server_address = ('192.168.131.1', 9999,)
                tcp_server.RequestHandlerClass = MyServer
                tcp_server.__is_shut_down = threading.Event() 
                    tcp_server.__is_shut_down._cond = Condition(Lock())
                    tcp_server.__is_shut_down._flag = False
                tcp_server.__shutdown_request = False
                tcp_server.socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
                同时,tcp_server.socket执行了下面两个方法:
                    tcp_server.socket.bind(('192.168.131.1', 9999,))
                    tcp_server.socket.listen(5) 
    
        '''
    
    
        tcp_server = socketserver.ThreadingTCPServer(('192.168.131.1', 9999,), MyServer)
    
    
    
        '''
        2. 执行tcp_server对象的serve_forever方法,寻找serve_forever方法的顺序是:
            ThreadingTCPServer类>>>ThreadingMixIn类>>>TCPServer类>>>BaseServer类.
            在BaseServer类中找到了serve_forever方法。
            2.1 执行BaseServer类中的serve_forever方法,就是执行下面代码:
                    self.__is_shut_down.clear()
                try:
                    with _ServerSelector() as selector:
                        selector.register(self, selectors.EVENT_READ)
        
                        while not self.__shutdown_request:
                            ready = selector.select(poll_interval)
                            # bpo-35017: shutdown() called during select(), exit immediately.
                            if self.__shutdown_request:
                                break
                            if ready:
                                self._handle_request_noblock()
        
                            self.service_actions()
                finally:
                    self.__shutdown_request = False
                    self.__is_shut_down.set()
                2.1.1 首先执行self.__is_shut_down.clear(),即tcp_server.__is_shut_down.clear()
                    也就是执行threading模块的Event类的实例的clear方法。其方法如下:
                        with self._cond: # 即 with tcp_server.__is_shut_down._cond
                            self._flag = False # 即 tcp_server.__is_shut_down._flag = False
                
                2.1.2 正常情况下,首先执行:with _ServerSelector() as selector:
                
                    _ServerSelector=selectors.SelectSelector,而SelectSelector是selectors模块下的一个类。
                    通过_ServerSelector()创建实例,就是通过selectors模块下的SelectSelector类创建实例。
                    执行selectors模块下的SelectSelector类的init方法时:
                    
                    2.1.2.1 首先需要执行其父类_BaseSelectorImpl的init方法,即下面代码:
                        # this maps file descriptors to keys
                        self._fd_to_key = {}
                        # read-only mapping returned by get_map()
                        self._map = _SelectorMapping(self)
                        也就是说 with _ServerSelector() as selector 中的selector对象具有下面2个属性了:
                            selector._fd_to_key = {}
                            selector._map = _SelectorMapping(selector)
                            2.1.2.1.2 上面代码selector._map = _SelectorMapping(self),就是通过_SelectorMapping创建了一个实例
                                在执行_SelectorMapping类的init方法时,传入一个参数,即selector这个对象自己。此时
                                    selector._map._selector = selector
                    
                    2.1.2.2 之后执行:
                        self._readers = set()
                        self._writers = set()
                        即_ServerSelector类的实例selector有下面两个属性了:
                        selector._readers = set()
                        selector._writers = set()
                    至此,with _ServerSelector() as selector 此条语句执行完毕,selector对象有下面4个属性:
                        selector._fd_to_key = {}
                        selector._map = _SelectorMapping(selector)
                            selector._map._selector = selector
                        selector._readers = set()
                        selector._writers = set()
                                 
                2.1.3 其次执行:selector.register(self, selectors.EVENT_READ)。即执行selector对象的register方法。
                    即执行:selector.register(fileobj=self, events=selectors.EVENT_READ, data=None)
                    其中selectors.EVENT_READ表示selectors模块下的EVENT_READ变量,而EVENT_READ= (1 << 0),即EVENT_READ=int(1*2**0)=1
                    其中selectors.EVENT_WRITE表示selectors模块下的EVENT_WRITE变量,而EVENT_WRITE= (1 << 1),即EVENT_WRITE=int(1*2**1)=2
                    
                    2.1.3.1 首先需要执行父类的register方法,并将返回值赋值给变量key。
                    
                        即_BaseSelectorImpl.register(fileobj=self, events=selectors.EVENT_READ, data=None),也就是执行下面代码:
                        
                            if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
                                raise ValueError("Invalid events: {!r}".format(events))
    
                            key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
                            此处SelectorKey=namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])函数的返回值,是Type[tuple]。
                            上面这句没看懂,导致下面key.fd也看不懂了,应当跟多路复用时的select.select方法差不多。                      
                            if key.fd in self._fd_to_key:
                                raise KeyError("{!r} (FD {}) is already registered".format(fileobj, key.fd))
    
                            self._fd_to_key[key.fd] = key
                            return key
                2.1.4 其次执行:ready = selector.select(poll_interval)
                    即执行selectors模块下的类SelectSelector类的select方法,如下:
                            if sys.platform == 'win32':
                                def _select(self, r, w, _, timeout=None):
                                    r, w, x = select.select(r, w, w, timeout)
                                    return r, w + x, []
                            else:
                                _select = select.select
                        
                            def select(self, timeout=None):
                                timeout = None if timeout is None else max(timeout, 0)
                                ready = []
                                try:
                                    r, w, _ = self._select(self._readers, self._writers, [], timeout)
                                except InterruptedError:
                                    return ready
                                r = set(r)
                                w = set(w)
                                for fd in r | w:
                                    events = 0
                                    if fd in r:
                                        events |= EVENT_READ  # 此句是什么意思,没看懂,原来是events要么等于0,或者等于1
                                    if fd in w:
                                        events |= EVENT_WRITE  # 按位或的意思,要么等于0,或者等于2
                                        # 原来是events = events | EVENT_WRITE,即events要么等于events或者等于 EVENT_WRITE
                        
                                    key = self._key_from_fd(fd)
                                    if key:
                                        ready.append((key, events & key.events))
                                return ready
                        需要查看别人的解释来理解,看https://www.cnblogs.com/zzzlw/p/9384308.html这篇文章
                2.1.5 如果ready不是空列表,代表有人连接或发送消息过来,则执行self._handle_request_noblock(),即执行tcp_server._handle_request_noblock(),即是如下代码:
                        try:
                            request, client_address = self.get_request()
                            
                        except OSError:
                            return
                        if self.verify_request(request, client_address):
                            try:
                                self.process_request(request, client_address)
                            except Exception:
                                self.handle_error(request, client_address)
                                self.shutdown_request(request)
                            except:
                                self.shutdown_request(request)
                                raise
                        else:
                            self.shutdown_request(request)
                        2.1.5.1 首先尝试执行 request, client_address = self.get_request(),即执行
                            tcp_server.get_request()方法。
                            tcp_server.get_request在ThreadingTCPServer类,ThreadingMixIn类都没找到,在TCPServer类中找到了,所以是执行TCPServer类中的get_request方法。
                               tcp_server.get_request()方法的返回值是self.socket.accept(),看到accept就知道了,
                               request,client_address = conn,addr = self.socket.accept() 
                       
                       2.1.5.2 没有异常时,再接着执行self.verify_request(request, client_address),这就是基类BaseServer的方法了,因为子类都没有这个方法,其返回值是True。
                       2.1.5.3 再尝试执行self.process_request(request, client_address),这是ThreadinMixIn类的方法。
                            tcp_server.process_request(request, client_address),即执行如下代码:
                                """Start a new thread to process the request."""
                                t = threading.Thread(target = self.process_request_thread,args = (request, client_address))
                                # 创建一个线程,线程处理的任务函数名是tcp_server.process_request_thread,任务函数的参数元组是(request, client_address)
                                t.daemon = self.daemon_threads
                                # daemon_threads是类变量,其值为False,block_on_close是类变量,其值是True。
                                if not t.daemon and self.block_on_close:
                                    if self._threads is None:
                                        # _threads是类变量,其值是None
                                        self._threads = []
                                    self._threads.append(t)
                                t.start()
                                # t.start()就是执行tcp_server.process_request_thread(request, client_address)     
                            2.1.5.3.1 在创建线程时,接收的任务函数名是tcp_server.process_request_thread,也在ThreadingMixIn类中。
                                其定义如下:
                                        def process_request_thread(self, request, client_address):
                                            try:
                                                self.finish_request(request, client_address)
                                            except Exception:
                                                self.handle_error(request, client_address)
                                            finally:
                                                self.shutdown_request(request)
                                2.1.5.3.1.1 尝试执行self.finish_request(request, client_address)时,实际调用的是
                                        self.RequestHandlerClass(request, client_address, self),也就是
                                        tcp_server.MyServer(request, client_address,tcp_server)
                                        也就是通过MyServer类创建了一个对象,而MyServer类并没有init方法,
                                        所以去MyServer类的父类BaseRequestsHandler类寻找init方法,其定义如下:
                                            def __init__(self, request, client_address, server):
                                                self.request = request
                                                self.client_address = client_address
                                                self.server = server
                                                self.setup()
                                                try:
                                                    self.handle()
                                                    # 因为MyServer类定义了handle方法,所以按照继承顺序,先调用MyServer类的handle方法,
                                                    # 而不是BaseRequestsHandler类的handle方法
                                                finally:
                                                    self.finish()
                                        # 也就是说直到这一步,才看到MyServer类的handle函数。
                                        # 花了4个小时才算分析完毕,真是折腾人啊
                                
                  
        '''
    
        tcp_server.serve_forever()
    
    
  • 相关阅读:
    代理工具介绍
    Cookie 相关
    在JavaScript 使用命名空间
    oracle rank()用法
    sql update 特殊用法
    Repeat 嵌套绑定
    .net 中隐式事务和显示事务的用法
    为站点添加迅雷下载和快车下载
    缺少MSVCR71.DLL解决方式
    关于AppFabric Caching的学习摘录
  • 原文地址:https://www.cnblogs.com/lanxing0422/p/pythonday15.html
Copyright © 2011-2022 走看看