zoukankan      html  css  js  c++  java
  • Python模块解析之SocketServer(二)_BaseServer基类

      我们从上一篇文章中的类图可以看出 BaseServer 类是SocketServer模块中所有类的基类。BaseServer类规定了整个模块的框架,所以我们先仔细了解BaseServer类

    我们先把整个BaseServer的源码贴出来

      

      1 import socket
      2 import select
      3 import sys
      4 import os
      5 import errno
      6 try:
      7     import threading
      8 except ImportError:
      9     import dummy_threading as threading
     10 
     11 __all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer",
     12            "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler",
     13            "StreamRequestHandler","DatagramRequestHandler",
     14            "ThreadingMixIn", "ForkingMixIn"]
     15 if hasattr(socket, "AF_UNIX"):
     16     __all__.extend(["UnixStreamServer","UnixDatagramServer",
     17                     "ThreadingUnixStreamServer",
     18                     "ThreadingUnixDatagramServer"])
     19 
     20 def _eintr_retry(func, *args):
     21     """restart a system call interrupted by EINTR"""
     22     while True:
     23         try:
     24             return func(*args)
     25         except (OSError, select.error) as e:
     26             if e.args[0] != errno.EINTR:
     27                 raise
     28 
     29 class BaseServer:
     30 
     31     """Base class for server classes.
     32 
     33     Methods for the caller:
     34 
     35     - __init__(server_address, RequestHandlerClass)
     36     - serve_forever(poll_interval=0.5)
     37     - shutdown()
     38     - handle_request()  # if you do not use serve_forever()
     39     - fileno() -> int   # for select()
     40 
     41     Methods that may be overridden:
     42 
     43     - server_bind()
     44     - server_activate()
     45     - get_request() -> request, client_address
     46     - handle_timeout()
     47     - verify_request(request, client_address)
     48     - server_close()
     49     - process_request(request, client_address)
     50     - shutdown_request(request)
     51     - close_request(request)
     52     - handle_error()
     53 
     54     Methods for derived classes:
     55 
     56     - finish_request(request, client_address)
     57 
     58     Class variables that may be overridden by derived classes or
     59     instances:
     60 
     61     - timeout
     62     - address_family
     63     - socket_type
     64     - allow_reuse_address
     65 
     66     Instance variables:
     67 
     68     - RequestHandlerClass
     69     - socket
     70 
     71     """
     72 
     73     timeout = None
     74 
     75     def __init__(self, server_address, RequestHandlerClass):
     76         """Constructor.  May be extended, do not override."""
     77         self.server_address = server_address
     78         self.RequestHandlerClass = RequestHandlerClass
     79         self.__is_shut_down = threading.Event()
     80         self.__shutdown_request = False
     81 
     82     def server_activate(self):
     83         """Called by constructor to activate the server.
     84 
     85         May be overridden.
     86 
     87         """
     88         pass
     89 
     90     def serve_forever(self, poll_interval=0.5):
     91         """Handle one request at a time until shutdown.
     92 
     93         Polls for shutdown every poll_interval seconds. Ignores
     94         self.timeout. If you need to do periodic tasks, do them in
     95         another thread.
     96         """
     97         self.__is_shut_down.clear()
     98         try:
     99             while not self.__shutdown_request:
    100                 # XXX: Consider using another file descriptor or
    101                 # connecting to the socket to wake this up instead of
    102                 # polling. Polling reduces our responsiveness to a
    103                 # shutdown request and wastes cpu at all other times.
    104                 r, w, e = _eintr_retry(select.select, [self], [], [],
    105                                        poll_interval)
    106                 if self in r:
    107                     self._handle_request_noblock()
    108         finally:
    109             self.__shutdown_request = False
    110             self.__is_shut_down.set()
    111 
    112     def shutdown(self):
    113         """Stops the serve_forever loop.
    114 
    115         Blocks until the loop has finished. This must be called while
    116         serve_forever() is running in another thread, or it will
    117         deadlock.
    118         """
    119         self.__shutdown_request = True
    120         self.__is_shut_down.wait()
    121 
    122     # The distinction between handling, getting, processing and
    123     # finishing a request is fairly arbitrary.  Remember:
    124     #
    125 
    126     # - handle_request() is the top-level call.  It calls
    127     #   select, get_request(), verify_request() and process_request()
    128     # - get_request() is different for stream or datagram sockets
    129     # - process_request() is the place that may fork a new process
    130     #   or create a new thread to finish the request
    131     # - finish_request() instantiates the request handler class;
    132     #   this constructor will handle the request all by itself
    133 
    134     def handle_request(self):
    135         """Handle one request, possibly blocking.
    136 
    137         Respects self.timeout.
    138         """
    139         # Support people who used socket.settimeout() to escape
    140         # handle_request before self.timeout was available.
    141         timeout = self.socket.gettimeout()
    142         if timeout is None:
    143             timeout = self.timeout
    144         elif self.timeout is not None:
    145             timeout = min(timeout, self.timeout)
    146         fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
    147         if not fd_sets[0]:
    148             self.handle_timeout()
    149             return
    150         self._handle_request_noblock()
    151 
    152     def _handle_request_noblock(self):
    153         """Handle one request, without blocking.
    154 
    155         I assume that select.select has returned that the socket is
    156         readable before this function was called, so there should be
    157         no risk of blocking in get_request().
    158         """
    159         try:
    160             request, client_address = self.get_request()
    161         except socket.error:
    162             return
    163         if self.verify_request(request, client_address):
    164             try:
    165                 self.process_request(request, client_address)
    166             except:
    167                 self.handle_error(request, client_address)
    168                 self.shutdown_request(request)
    169 
    170     def handle_timeout(self):
    171         """Called if no new request arrives within self.timeout.
    172 
    173         Overridden by ForkingMixIn.
    174         """
    175         pass
    176 
    177     def verify_request(self, request, client_address):
    178         """Verify the request.  May be overridden.
    179 
    180         Return True if we should proceed with this request.
    181 
    182         """
    183         return True
    184 
    185     def process_request(self, request, client_address):
    186         """Call finish_request.
    187 
    188         Overridden by ForkingMixIn and ThreadingMixIn.
    189 
    190         """
    191         self.finish_request(request, client_address)
    192         self.shutdown_request(request)
    193 
    194     def server_close(self):
    195         """Called to clean-up the server.
    196 
    197         May be overridden.
    198 
    199         """
    200         pass
    201 
    202     def finish_request(self, request, client_address):
    203         """Finish one request by instantiating RequestHandlerClass."""
    204         self.RequestHandlerClass(request, client_address, self)
    205 
    206     def shutdown_request(self, request):
    207         """Called to shutdown and close an individual request."""
    208         self.close_request(request)
    209 
    210     def close_request(self, request):
    211         """Called to clean up an individual request."""
    212         pass
    213 
    214     def handle_error(self, request, client_address):
    215         """Handle an error gracefully.  May be overridden.
    216 
    217         The default is to print a traceback and continue.
    218 
    219         """
    220         print '-'*40
    221         print 'Exception happened during processing of request from',
    222         print client_address
    223         import traceback
    224         traceback.print_exc() # XXX But this goes to stderr!
    225         print '-'*40

    我们先从BaseServer的介绍文档来看:

    Methods for the caller:

    - __init__(server_address, RequestHandlerClass) 初始化,传入构建一个Socket服务器所必需的address以及每个请求到达时的处理类
    - serve_forever(poll_interval=0.5) 启动服务器,服务器进入一个无限循环的状态,poll_interval用来设置循环间隔时间,默认为0.5秒。
    - shutdown()  关闭服务器
    - handle_request() # if you do not use serve_forever()  处理请求的方法,如果你没有调用server_forver方法,请求到达时将会调用此方法
    - fileno() -> int # for select()    文件描述符,相当于一个句柄。我们知道在Unix中一切皆为文件,系统操纵每个文件时通过 文件描述符对文件来进行文件操纵的。

    以上的方法都是一些基础的方法BaseServer里面已经实现,下面的一些方法可能就需要由子类来重新定义或覆盖

    Methods that may be overridden:

    - server_bind()  服务器绑定,被构造器调用
    - server_activate() 服务器激活
    - get_request() -> request, client_address 获取请求的socket
    - handle_timeout()  服务器超时处理
    - verify_request(request, client_address)  验证请求
    - server_close()  关闭服务器
    - process_request(request, client_address)  处理请求
    - shutdown_request(request)  关闭请求
    - close_request(request)   关闭请求
    - handle_error()   错误处理

    子类的方法

    Methods for derived classes:

    - finish_request(request, client_address)

    可能被子类或者实例覆盖的的类变量

    Class variables that may be overridden by derived classes or
    instances:

    - timeout 超时时间
    - address_family  地址类型
    - socket_type  socket类型 TCP/UDP
    - request_queue_size (only for stream sockets)  请求队列大小,只是针对 流socket
    - allow_reuse_address  是否允许重置监听的地址

    Instance variables:  实例变量

    - server_address  服务器地址(服务监听的地址)
    - RequestHandlerClass  请求处理类
    - socket   套接字

    """

    从上面BaseSever的介绍中我们已经大致了解了整个SocketSever的设计思路。构建一个SocketServer,定义SocketServer的一些基本操作 启动,关闭,处理请求,超时处理,错误处理。定义一个SocketServer所需要信息和步骤都一样所以它把他放在初始化构造器里面,由我们在创建BaseServer时传入。启动,关闭这些基础功能在BaseServer里面已经实现。针对我们最感兴趣的 "处理请求" 它把请求处理 抽离出来成一个类,让我们自己编写处理类,因为如何处理只有我们自己知道。把变化抽离出来,,这样提高了服务器可扩展功能。针对不同Socket的更详细的方法 当然就由不同的SocketServer子类来重新定义喽。

    有了上面的大致认识后,在研究BaseServer的源代码就有一个主体方向了:

    1 timeout = None

    定义一个 超时 类变量

    def __init__(self, server_address, RequestHandlerClass):
            """Constructor.  May be extended, do not override."""
            self.server_address = server_address
            self.RequestHandlerClass = RequestHandlerClass
            self.__is_shut_down = threading.Event()
            self.__shutdown_request = False

    初始化构造器,定义了 构造了一个SocketServer 所必须的最基本的server_address 以及我们自己编写的RequestHandlerClass类。我们看到一个 __is_shut_down私有成员变量 被赋值了一个 threading.Event()可以看出这个变量是用来控制 服务器关闭的。还有一个__shutdown_request标识是否关闭请求。

    1     def server_activate(self):
    2         """Called by constructor to activate the server.
    3 
    4         May be overridden.
    5 
    6         """
    7         pass

    服务器激活 由子类覆盖实现。

     1     def serve_forever(self, poll_interval=0.5):
     2         """Handle one request at a time until shutdown.
     3 
     4         Polls for shutdown every poll_interval seconds. Ignores
     5         self.timeout. If you need to do periodic tasks, do them in
     6         another thread.
     7         """
     8         self.__is_shut_down.clear()
     9         try:
    10             while not self.__shutdown_request:
    11                 # XXX: Consider using another file descriptor or
    12                 # connecting to the socket to wake this up instead of
    13                 # polling. Polling reduces our responsiveness to a
    14                 # shutdown request and wastes cpu at all other times.
    15                 r, w, e = _eintr_retry(select.select, [self], [], [],
    16                                        poll_interval)
    17                 if self in r:
    18                     self._handle_request_noblock()
    19         finally:
    20             self.__shutdown_request = False
    21             self.__is_shut_down.set()

    从这段代码中我们可以看出BaseServer是一个非阻塞的SocketServer.用Threading.Event来控制服务器的关闭,那么self.__is_shutdown是如何来控制服务器的关闭呢?

    我们接着看:

    1     def shutdown(self):
    2         """Stops the serve_forever loop.
    3 
    4         Blocks until the loop has finished. This must be called while
    5         serve_forever() is running in another thread, or it will
    6         deadlock.
    7         """
    8         self.__shutdown_request = True
    9         self.__is_shut_down.wait()

    从shutdown()方法中我们 通过调用self.__shutdown_request来停止 serve_forever的循环,但是通过调用self.__is_shut_down.wait()方法是它一直处于阻塞状态,知道调用self.__is_shut_down.set()后进程才算结束。也就是说知道处理完所有请求才关闭掉进程。如果server_forver()方法运行在另一个线程中这必须调用该方法否则会出现死锁。

    至于用select实现非阻塞的处理.关于threading.Event 和 select.select可以看我之前的专门介绍Event和select的文章。可能现在有点搞不懂,因为这要结合后面的代码 然后串起来 才能够明白。这里我们只需要明白是通过select来实现非阻塞的服务的,具体细节后面会结合代码来讨论。

    下面要介绍的两个方法至关重要,我们要结合 server_forver() 来 把整个服务器如何通过非阻塞方式来  接收请求 处理请求  这个最重要的处理步骤弄明白,

    我们先来看看 handler_request()的介绍:

    # The distinction between handling, getting, processing and
    # finishing a request is fairly arbitrary. Remember:
    #

    一个请求的handling, getting, processing, finishing它们之间的区别是非常重要的。记住: 

    # - handle_request() is the top-level call. It calls
    # select, get_request(), verify_request() and process_request()
    # - get_request() is different for stream or datagram sockets
    # - process_request() is the place that may fork a new process
    # or create a new thread to finish the request
    # - finish_request() instantiates the request handler class;
    # this constructor will handle the request all by itself

    handler_request()方法是最上层的调用.它调用了 select, get_Request(), verify_request() 和 process_request()方法.

    get_request()对于 流套接字(面向连接)和 数据包套接字(面向非连接)来说 是不同的.在Python socket编程中对于  stream(流套接字)接收请求调用socket.accept(),对于 datagram(数据包套接字)接收请求时调用socket.recvfrom().所以说这个方法由子类来实现。针对不同的套接字调用不同的方法。

    process_request()处理一个请求时可能会创建一个进程或者线程去处理。

    finish_request()完成请求 不同的实例所做的处理也不同。这个无关紧要。

     1     def handle_request(self):
     2         """Handle one request, possibly blocking.
     3 
     4         Respects self.timeout.
     5         """
     6         # Support people who used socket.settimeout() to escape
     7         # handle_request before self.timeout was available.
     8         timeout = self.socket.gettimeout()
     9         if timeout is None:
    10             timeout = self.timeout
    11         elif self.timeout is not None:
    12             timeout = min(timeout, self.timeout)
    13         fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
    14         if not fd_sets[0]:
    15             self.handle_timeout()
    16             return
    17         self._handle_request_noblock()

    支持请求超时,如果请求超时(在timeout时间内没有请求到达)则调用超时处理方法 hanler_timeout(),否则调用self._handle_request_noblock()

     1     def _handle_request_noblock(self):
     2         """Handle one request, without blocking.
     3 
     4         I assume that select.select has returned that the socket is
     5         readable before this function was called, so there should be
     6         no risk of blocking in get_request().
     7         """
     8         try:
     9             request, client_address = self.get_request()
    10         except socket.error:
    11             return
    12         if self.verify_request(request, client_address):
    13             try:
    14                 self.process_request(request, client_address)
    15             except:
    16                 self.handle_error(request, client_address)
    17                 self.shutdown_request(request)

    一个非阻塞的处理请求的方法。我们在回到  server_forever()方法中:

    1 self._handle_request_noblock()

    我想可能 下面这段代码 会更明白一些。

     1 import socket
     2 import traceback
     3 import select
     4 
     5 EOL1 = b'\n\n'
     6 EOL2 = b'\n\r\n'
     7 
     8 socketmap = {}
     9 r, w, e = [], [], []
    10 
    11 response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
    12 response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
    13 response += b'Hello, world!'
    14 
    15 serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    16 serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    17 serversocket.bind(('0.0.0.0', 23456))
    18 serversocket.listen(1)
    19 # serversocket.setblocking(0)
    20 
    21 listening_fileno = serversocket.fileno()
    22 socketmap[listening_fileno] = serversocket
    23 print 'listening_fileno', listening_fileno
    24 
    25 try:
    26     while True:
    27         r, w, e = [], [], []
    28         for fd in socketmap:
    29             r.append(fd)
    30             w.append(fd)
    31             e.append(fd)
    32         r, w, e = select.select(r, w, e, 1)
    33         for fd in r:
    34             request = b''
    35             isocket = socketmap[fd]
    36             if fd == listening_fileno:
    37                 print 'accepting'
    38                 clientsock, clientaddr = isocket.accept()
    39                 # clientsock.setblocking(0)
    40                 cli_fileno = clientsock.fileno()
    41                 r.append(cli_fileno)
    42                 w.append(cli_fileno)
    43                 e.append(cli_fileno)
    44                 socketmap[cli_fileno] = clientsock
    45             else:
    46                 print 'reading'
    47                 while EOL1 not in request and EOL2 not in request:
    48                     request += isocket.recv(1024)
    49                 print(request.decode())
    50 
    51         for fd in w:
    52             print 'writing'
    53             osocket = socketmap[fd]
    54             osocket.send(response)
    55             
    56         for fd in e:
    57             esocket = socketmap[fd]
    58             print 'socket close', fd
    59             esocket.close()
    60             del socketmap[fd]
    61 
    62         print "no data coming"
    63 
    64 except Exception, e:
    65     print traceback.print_exc()
    66     serversocket.close()

    我们利用了select实现了非阻塞socket服务.

    至于 后面的都是一些细节处理

     1     def handle_timeout(self):
     2         """Called if no new request arrives within self.timeout.
     3 
     4         Overridden by ForkingMixIn.
     5         """
     6         pass
     7 
     8     def verify_request(self, request, client_address):
     9         """Verify the request.  May be overridden.
    10 
    11         Return True if we should proceed with this request.
    12 
    13         """
    14         return True
    15 
    16     def process_request(self, request, client_address):
    17         """Call finish_request.
    18 
    19         Overridden by ForkingMixIn and ThreadingMixIn.
    20 
    21         """
    22         self.finish_request(request, client_address)
    23         self.shutdown_request(request)
    24 
    25     def server_close(self):
    26         """Called to clean-up the server.
    27 
    28         May be overridden.
    29 
    30         """
    31         pass
    32 
    33     def finish_request(self, request, client_address):
    34         """Finish one request by instantiating RequestHandlerClass."""
    35         self.RequestHandlerClass(request, client_address, self)
    36 
    37     def shutdown_request(self, request):
    38         """Called to shutdown and close an individual request."""
    39         self.close_request(request)
    40 
    41     def close_request(self, request):
    42         """Called to clean up an individual request."""
    43         pass
    44 
    45     def handle_error(self, request, client_address):
    46         """Handle an error gracefully.  May be overridden.
    47 
    48         The default is to print a traceback and continue.
    49 
    50         """
    51         print '-'*40
    52         print 'Exception happened during processing of request from',
    53         print client_address
    54         import traceback
    55         traceback.print_exc() # XXX But this goes to stderr!
    56         print '-'*40

    总结: 其实BaseServer只是大致搭了一个 模型 。控制整体的服务流程 启动服务->接收请求->处理请求 ,对于接收和处理 则 利用 select 实现非阻塞。把请求处理抽离出来,让我们自己实现。当然对于处理 请求 process_request() 它还提供了 让我们自己实现,使我们可以利用 进程和线程来处理 实现更高效。其实只要理解了 select 和上面我贴出的那段代码 BaseServer也就理解啦。后面我们会详细的介绍处理各个子类的实现细节。

  • 相关阅读:
    web测试学习大纲
    Python语言编程基础
    python文件IO
    Python 正则表达式
    python官网导航翻译
    python常用库
    python连接数据库
    sublime与python交互
    selenium连接浏览器方式
    sublime中运行python时编码格式问题
  • 原文地址:https://www.cnblogs.com/ArtsCrafts/p/Python_SocketServer_BaseServer.html
Copyright © 2011-2022 走看看