zoukankan      html  css  js  c++  java
  • ThreadingTCPServer源码解析

    实例

    #!/usr/bin/env python
    #-*- coding:utf-8 -*-
    import SocketServer
    
    class Myserver(SocketServer.BaseRequestHandler):
    
        def handle(self):
            conn = self.request
            print self.client_address
            conn.sendall("我能同时处理多个请求!")
            flag = True
            while flag:
                data = conn.recv(1024)
                if data == "exit":
                    flag = False
                else:
                    conn.sendall(data)
    
    if __name__ == "__main__":
        server = SocketServer.ThreadingTCPServer(("localhost",8000),Myserver)
        server.serve_forever()

    client请求原理图

    源码解析图

    源码

    class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
    ThreadingTCPServer
    class ThreadingMixIn:
        """Mix-in class to handle each request in a new thread."""
    
        # Decides how threads will act upon termination of the
        # main process
        daemon_threads = False
    
        def process_request_thread(self, request, client_address):
            """Same as in BaseServer but as a thread.
    
            In addition, exception handling is done here.
    
            """
            try:
                self.finish_request(request, client_address)
                self.shutdown_request(request)
            except:
                self.handle_error(request, client_address)
                self.shutdown_request(request)
    
        def process_request(self, request, client_address):
            """Start a new thread to process the request."""
            t = threading.Thread(target = self.process_request_thread,
                                 args = (request, client_address))
            t.daemon = self.daemon_threads
            t.start()
    ThreadingMixIn
    class TCPServer(BaseServer):
    
        """Base class for various socket-based server classes.
    
        Defaults to synchronous IP stream (i.e., TCP).
    
        Methods for the caller:
    
        - __init__(server_address, RequestHandlerClass, bind_and_activate=True)
        - serve_forever(poll_interval=0.5)
        - shutdown()
        - handle_request()  # if you don't use serve_forever()
        - fileno() -> int   # for select()
    
        Methods that may be overridden:
    
        - server_bind()
        - server_activate()
        - get_request() -> request, client_address
        - handle_timeout()
        - verify_request(request, client_address)
        - process_request(request, client_address)
        - shutdown_request(request)
        - close_request(request)
        - handle_error()
    
        Methods for derived classes:
    
        - finish_request(request, client_address)
    
        Class variables that may be overridden by derived classes or
        instances:
    
        - timeout
        - address_family
        - socket_type
        - request_queue_size (only for stream sockets)
        - allow_reuse_address
    
        Instance variables:
    
        - server_address
        - RequestHandlerClass
        - socket
    
        """
    
        address_family = socket.AF_INET
    
        socket_type = socket.SOCK_STREAM
    
        request_queue_size = 5
    
        allow_reuse_address = False
    
        def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
            """Constructor.  May be extended, do not override."""
            BaseServer.__init__(self, server_address, RequestHandlerClass)
            self.socket = socket.socket(self.address_family,
                                        self.socket_type)
            if bind_and_activate:
                try:
                    self.server_bind()
                    self.server_activate()
                except:
                    self.server_close()
                    raise
    
        def server_bind(self):
            """Called by constructor to bind the socket.
    
            May be overridden.
    
            """
            if self.allow_reuse_address:
                self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.socket.bind(self.server_address)
            self.server_address = self.socket.getsockname()
    
        def server_activate(self):
            """Called by constructor to activate the server.
    
            May be overridden.
    
            """
            self.socket.listen(self.request_queue_size)
    
        def server_close(self):
            """Called to clean-up the server.
    
            May be overridden.
    
            """
            self.socket.close()
    
        def fileno(self):
            """Return socket file number.
    
            Interface required by select().
    
            """
            return self.socket.fileno()
    
        def get_request(self):
            """Get the request and client address from the socket.
    
            May be overridden.
    
            """
            return self.socket.accept()
    
        def shutdown_request(self, request):
            """Called to shutdown and close an individual request."""
            try:
                #explicitly shutdown.  socket.close() merely releases
                #the socket and waits for GC to perform the actual close.
                request.shutdown(socket.SHUT_WR)
            except socket.error:
                pass #some platforms may raise ENOTCONN here
            self.close_request(request)
    
        def close_request(self, request):
            """Called to clean up an individual request."""
            request.close()
    TCPServer
    class BaseServer:
        timeout = None
    
        def __init__(self, server_address, RequestHandlerClass):
            """Constructor.  May be extended, do not override."""
            self.server_address = server_address
            self.RequestHandlerClass = RequestHandlerClass
            self.__is_shut_down = threading.Event()
            self.__shutdown_request = False
    ......
        def serve_forever(self, poll_interval=0.5):
            """Handle one request at a time until shutdown.
    
            Polls for shutdown every poll_interval seconds. Ignores
            self.timeout. If you need to do periodic tasks, do them in
            another thread.
            """
            self.__is_shut_down.clear()
            try:
                while not self.__shutdown_request:
                    r, w, e = _eintr_retry(select.select, [self], [], [],
                                           poll_interval)
                    if self in r:
                        self._handle_request_noblock()
            finally:
                self.__shutdown_request = False
                self.__is_shut_down.set()
    ............
        def _handle_request_noblock(self):
            """Handle one request, without blocking.
    
            I assume that select.select has returned that the socket is
            readable before this function was called, so there should be
            no risk of blocking in get_request().
            """
            try:
                request, client_address = self.get_request()
            except socket.error:
                return
            if self.verify_request(request, client_address):
                try:
                    self.process_request(request, client_address)
                except:
                    self.handle_error(request, client_address)
                    self.shutdown_request(request)
            else:
                self.shutdown_request(request)
    
    ...........
        def finish_request(self, request, client_address):
            """Finish one request by instantiating RequestHandlerClass."""
            self.RequestHandlerClass(request, client_address, self)
    
        def shutdown_request(self, request):
            """Called to shutdown and close an individual request."""
            self.close_request(request)
    BaseServer部分

    延伸

    实现多进程,与多线程同理,不过使用的是ForkingTCPServer()类。

    ForkingTCPServer源码:

    class ForkingTCPServer(ForkingMixIn, TCPServer): pass
    class ForkingMixIn:
    
        """Mix-in class to handle each request in a new process."""
    
        timeout = 300
        active_children = None
        max_children = 40
    
        def collect_children(self):
            """Internal routine to wait for children that have exited."""
            if self.active_children is None:
                return
    
            # If we're above the max number of children, wait and reap them until
            # we go back below threshold. Note that we use waitpid(-1) below to be
            # able to collect children in size(<defunct children>) syscalls instead
            # of size(<children>): the downside is that this might reap children
            # which we didn't spawn, which is why we only resort to this when we're
            # above max_children.
            while len(self.active_children) >= self.max_children:
                try:
                    pid, _ = os.waitpid(-1, 0)
                    self.active_children.discard(pid)
                except OSError as e:
                    if e.errno == errno.ECHILD:
                        # we don't have any children, we're done
                        self.active_children.clear()
                    elif e.errno != errno.EINTR:
                        break
    
            # Now reap all defunct children.
            for pid in self.active_children.copy():
                try:
                    pid, _ = os.waitpid(pid, os.WNOHANG)
                    # if the child hasn't exited yet, pid will be 0 and ignored by
                    # discard() below
                    self.active_children.discard(pid)
                except OSError as e:
                    if e.errno == errno.ECHILD:
                        # someone else reaped it
                        self.active_children.discard(pid)
    
        def handle_timeout(self):
            """Wait for zombies after self.timeout seconds of inactivity.
    
            May be extended, do not override.
            """
            self.collect_children()
    
        def process_request(self, request, client_address):
            """Fork a new subprocess to process the request."""
            self.collect_children()
            pid = os.fork()
            if pid:
                # Parent process
                if self.active_children is None:
                    self.active_children = set()
                self.active_children.add(pid)
                self.close_request(request) #close handle in parent process
                return
            else:
                # Child process.
                # This must never return, hence os._exit()!
                try:
                    self.finish_request(request, client_address)
                    self.shutdown_request(request)
                    os._exit(0)
                except:
                    try:
                        self.handle_error(request, client_address)
                        self.shutdown_request(request)
                    finally:
                        os._exit(1)
    ForkingMixIn
  • 相关阅读:
    清除内联元素间默认的间隔
    Idea配置SpringBoot热部署
    导出下载模板
    Java 使用Graphics2D 进行画图
    【Modbus】Java使用 modbus-master-tcp 读取和写入Modbus服务器数据
    解决txt文件上传oss服务器乱码的问题
    docker java.lang.NoClassDefFoundError: org/bouncycastle/** 解决
    解决SpringBoot 报错 Error parsing HTTP request header
    React+AntDesign使用Tree树控件完整展现其中的层级关系,并具有展开收起选择等交互功能
    AntDesign使用Form表单出现You cannot set a form field before rendering a field associated with the value
  • 原文地址:https://www.cnblogs.com/chbo/p/7026891.html
Copyright © 2011-2022 走看看