zoukankan      html  css  js  c++  java
  • socketserver源码解析和协程版socketserver

      来,贴上一段代码让你仰慕一下欧socketserver的魅力,看欧怎么完美实现多并发的魅力

    client

    import socket
    
    
    ip_port = ('127.0.0.1',8009)
    sk = socket.socket()
    sk.connect(ip_port)
    sk.settimeout(5)
    
    while True:
        data = sk.recv(1024)
        print('receive:',data.decode())
        inp = input('please input:')
        sk.sendall(inp.encode())
        if inp == 'exit':
            break
    
    sk.close()
    

     server

    '''
    对于socketserver,你需要做的事:
        定义个一类,继续socketserver.BaseRequestHandler
        重写handle方法
        把写好的类和端口进行实例,启动程序
    '''
    
    import socketserver
    
    class MyServer(socketserver.BaseRequestHandler):
    
        def handle(self):
            # print self.request,self.client_address,self.server
            conn = self.request
            conn.sendall('欢迎致电 10086,请输入1xxx,0转人工服务.'.encode())
            Flag = True
            while Flag:
                data = conn.recv(1024).decode()
                if data == 'exit':
                    Flag = False
                elif data == '0':
                    conn.sendall('通过可能会被录音.balabala一大推'.encode())
                else:
                    conn.sendall('请重新输入.'.encode())
    
    
    if __name__ == '__main__':
        server = socketserver.ThreadingTCPServer(('127.0.0.1',8009),MyServer)
        server.serve_forever()
    

    代码执行看源码

       我们看到最后两句代码,第一句--带有括号,我们第一想到的是要么是个函数,要是是个类,那么看一下什么吧

      是个类,继承了ThreadingMixIn和TCPServer两个类,好!实例对象肯定要找构造方法,当前这个类没有,就需要到父类中找了,从左往右

      我们发现ThreadingMixIn类里没有,那肯定在TCPServer类里啦,果真在,并且还执行了TCPServer的父类BaseServer里的构造方法

      在BaseServer里只是做了一些数据初始化的事,那我们回到TCPServer里构造方法里接着往下看吧

      执行父类构造方法后,实例一个socket对象,然后就是在if下执行self.server_bind()方法,要想知道这个方法,必须弄清楚self是指代谁?

      说到这里,我必须屡屡类的继承关系了...

      self是指实例对象,谁实例的,是ThreadingTCPServer类,所以找server_bind方法,要从ThreadingTCPServer往上找

      我们发现这个方法还是TCPServer里,

      似乎是做了绑定socket的事,这里过,在找看构造方法里,后又执行self.server_activate方法,我们按照那继承关系又在TCPServer找到了

      似乎是做了有关监听数的事,好这里构造方法执行完毕,第一代码也就这样看完了

      看到第二句,开始找serve_forever方法在哪了

      在BaseServer里,这方法大概说的是,一次处理一个请求直到连接关闭,如果处理其他要求另外开启一个线程

      我发现下有个self._handle_request_noblock(),我们看看这个是做啥的吧?

      还是在BaseServer,好像是做不阻塞的工作

      在这里还是执行了一个process_request方法,这个方法我们在ThreadingMixIn找到了

      主要是实例线程和启动线程的...

      没错,似乎我有了一点感悟--

      socketserver是基于多线程来做的,并做到多并发处理

    内部调用流程为:

    • 启动服务端程序
    • 执行 TCPServer.__init__ 方法,创建服务端Socket对象并绑定 IP 和 端口
    • 执行 BaseServer.__init__ 方法,将自定义的继承自SocketServer.BaseRequestHandler 的类 MyRequestHandle赋值给 self.RequestHandlerClass
    • 执行 BaseServer.server_forever 方法,While 循环一直监听是否有客户端请求到达 ...
    • 当客户端连接到达服务器
    • 执行 ThreadingMixIn.process_request 方法,创建一个 “线程” 用来处理请求
    • 执行 ThreadingMixIn.process_request_thread 方法
    • 执行 BaseServer.finish_request 方法,执行 self.RequestHandlerClass()  即:执行 自定义 MyRequestHandler 的构造方法(自动调用基类BaseRequestHandler的构造方法,在该构造方法中又会调用 MyRequestHandler的handle方法)

    服务类:

      SocketServer提供了5个基本的服务类:

      BaseServer: 基础类,由于下面四个网络服务类的继承

      TCPServer:针对TCP套接字流

      UDPServer:针对UDP数据报套接字

      UnixStreamServer:处理流式套接字,与TCPServer配合

      UnixDatagramServer:处理数据报套接字,与UDPServer配合

    异步处理类:

      这个四个服务类都是同步处理请求的。一个请求没处理完不能处理下一个请求。要想支持异步模型,可以利用多继承让server类继承ForkingMixIn 或 ThreadingMixIn。

      ForkingMixIn: 利用多进程(分叉)实现异步。(Mix-in class to handle each request in a new process)

      ThreadingMixIn: 利用多线程实现异步。(Mix-in class to handle each request in a new thread)

    请求处理类:

      要实现一项服务,还必须派生一个handler请求处理类,并重写父类的handle()方法。handle方法就是用来专门是处理请求的。该模块是通过服务类和请求处理类组合来处理请求的。

      SocketServer模块提供的请求处理类有BaseRequestHandler,以及它的派生类StreamRequestHandlerDatagramRequestHandler。从名字看出可以一个处理流式套接字,一个处理数据报套接字。

    #服务器端
    
    from SocketServer import TCPServer,StreamRequestHandler,
        ThreadingMixIn, ForkingMixIn
    
    
    
    #定义请求处理类
    class Handler(StreamRequestHandler):
        def handle(self):
            addr = self.request.getpeername()
            print 'connection:', addr
            while 1:
                self.request.sendall(self.request.recv(1024))
    
    
    #实例化服务类对象
    server = TCPServer(
        server_address=('127.0.0.1', 8123),     # address
        RequestHandlerClass=Handler             # 请求类
    )
    
    #开启服务
    server.serve_forever()
    
    #客户端
    
    import socket
    
    def socketClient():
        so = socket.socket()
        so.connect(('127.0.0.1', 8123))
        # so.close()
        while 1:
            so.sendall(raw_input('msg'))
            print so.recv(1024)
    
    if __name__ == '__main__':
        socketClient()
    

     多线程服务端

    from SocketServer import TCPServer,StreamRequestHandler,
        ThreadingMixIn, ForkingMixIn
    
    
    #定义基于多线程的服务类
    class Server(ThreadingMixIn, TCPServer):
        pass
    
    
    #定义请求处理类
    class Handler(StreamRequestHandler):
        def handle(self):
            addr = self.request.getpeername()
            print 'connection:', addr
            while 1:
                self.request.sendall(self.request.recv(1024))
    
    
    #实例化服务类对象
    server = Server(
        server_address=('127.0.0.1', 8123),     # address
        RequestHandlerClass=Handler             # 请求类
    )
    
    #开启服务
    server.serve_forever()
    

     源码分析:

    """普通的Socket服务类.
    
    socket服务:
    
    - address family:
            - AF_INET{,6}: IP (Internet Protocol) sockets (default)
            - AF_UNIX: Unix domain sockets
            - others, e.g. AF_DECNET are conceivable (see <socket.h>
    - socket type:
            - SOCK_STREAM (reliable stream, e.g. TCP)
            - SOCK_DGRAM (datagrams, e.g. UDP)
    
    请求服务类 (including socket-based):
    
    - 客户端地址验证之前进一步查看请求
            (实际上是一个请求处理的钩子在请求之前,例如logging)
    - 如何处理多个请求:
            - synchronous (同步:同一时间只能有一个请求)
            - forking (分叉:每个请求分配一个新的进程)
            - threading (线程:每个请求分配一个新的线程)
    
    
    
    五个类的继承关系如下:
    
            +------------+
            | BaseServer |
            +------------+
                  |
                  v
            +-----------+        +------------------+
            | TCPServer |------->| UnixStreamServer |
            +-----------+        +------------------+
                  |
                  v
            +-----------+        +--------------------+
            | UDPServer |------->| UnixDatagramServer |
            +-----------+        +--------------------+
    
    
    
    通过ForkingMixIn创建进程,通过ThreadingMixIn创建线程,如下实例:
    
            class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
    
    """
    
    __version__ = "0.4"
    
    
    import socket
    import select
    import sys
    import os
    import errno
    try:
        import threading
    except ImportError:
        import dummy_threading as threading
    
    __all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer",
               "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler",
               "StreamRequestHandler","DatagramRequestHandler",
               "ThreadingMixIn", "ForkingMixIn"]
    
    #family参数代表地址家族,比较常用的为AF_INET或AF_UNIX。
    #AF_UNIX用于同一台机器上的进程间通信,AF_INET对于IPV4协议的TCP和UDP 。           
    if hasattr(socket, "AF_UNIX"):
        __all__.extend(["UnixStreamServer","UnixDatagramServer",
                        "ThreadingUnixStreamServer",
                        "ThreadingUnixDatagramServer"])
    
    def _eintr_retry(func, *args):
        """重新启动系统调用EINTR中断"""
        while True:
            try:
                return func(*args)
            except (OSError, select.error) as e:
                if e.args[0] != errno.EINTR:
                    raise
    
    class BaseServer:
    
        """服务类的基类.
    
        调用方法:
    
        - __init__(server_address, RequestHandlerClass)
        - serve_forever(poll_interval=0.5)
        - shutdown()
        - handle_request()  # if you do not use serve_forever()
        - fileno() -> int   # for select()
    
        可以被重写的方法:
    
        - server_bind()
        - server_activate()
        - get_request() -> request, client_address
        - handle_timeout()
        - verify_request(request, client_address)
        - server_close()
        - process_request(request, client_address)
        - shutdown_request(request)
        - close_request(request)
        - handle_error()
    
        派生类(derived classes)方法:
    
        - finish_request(request, client_address)
    
        可以由派生类或重写类变量实例:
    
        - timeout
        - address_family
        - socket_type
        - allow_reuse_address
    
        实例变量:
    
        - RequestHandlerClass
        - socket
    
        """
    
        timeout = None
    
        def __init__(self, server_address, RequestHandlerClass):
            """初始化,能被扩展但不要重写."""
            self.server_address = server_address                # 地址元祖如('127.0.0.1', 8123)
            self.RequestHandlerClass = RequestHandlerClass      # 请求处理类
            self.__is_shut_down = threading.Event()             # 多线程通信机制
            self.__shutdown_request = False
    
        def server_activate(self):
            """通过构造函数激活服务器.可被重写."""
            pass
    
        def serve_forever(self, poll_interval=0.5):
            """在一个时间段内处理一个请求直到关闭.
    
            处理请求,直到一个明确的shutdown()请求。每poll_interval秒轮询一次shutdown。
            忽略self.timeout。如果你需要做周期性的任务,建议放置在其他线程。
            """
    
            self.__is_shut_down.clear()
            #Event是Python多线程通信的最简单的机制之一.一个线程标识一个事件,其他线程一直处于等待状态。
            #一个事件对象管理一个内部标示符,这个标示符可以通过set()方法设为True,通过clear()方法重新设为False,wait()方法则使线程一直处于阻塞状态,直到标示符变为True
            #也就是说我们可以通过 以上三种方法来多个控制线程的行为。
            try:
                while not self.__shutdown_request:
                    #考虑使用其他文件描述符或者连接socket去唤醒它取代轮询
                    #轮询减少在其他时间我们响应了关闭请求CPU。
                    r, w, e = _eintr_retry(select.select, [self], [], [],
                                           poll_interval)
                    if self in r:
                        self._handle_request_noblock()
            finally:
                self.__shutdown_request = False
                self.__is_shut_down.set()
    
        def shutdown(self):
            """终止serve_forever的循环.
    
            阻塞直到循环结束. 当serve_forever()方法正运行在另外的线程中必须调用它,否则会发生死锁.
            """
            self.__shutdown_request = True
            self.__is_shut_down.wait()
    
    
        # - handle_request() 是顶层调用.  它调用select,get_request(),verify_request()和process_request()
        # - get_request() 不同于流式和报文socket
        # - process_request() 产生进程的位置,或者产生线程去结束请求
        # - finish_request() 请求处理类的实例,此构造都将处理请求本身
    
        def handle_request(self):
            """处理一个请求, 可能阻塞.考虑self.timeout."""
            # Support people who used socket.settimeout() to escape
            # handle_request before self.timeout was available.
            timeout = self.socket.gettimeout()  # 返回当前超时期的值,如果没有设置超时期,则返回None
            if timeout is None:
                timeout = self.timeout
            elif self.timeout is not None:
                timeout = min(timeout, self.timeout)
            fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
            if not fd_sets[0]:
                self.handle_timeout()
                return
            self._handle_request_noblock()
    
        def _handle_request_noblock(self):
            """处理一个请求, 非阻塞."""
            try:
                request, client_address = self.get_request()
            except socket.error:
                return
            if self.verify_request(request, client_address):
                try:
                    self.process_request(request, client_address)
                except:
                    self.handle_error(request, client_address)
                    self.shutdown_request(request)
    
        def handle_timeout(self):
            """超时处理。默认对于forking服务器是收集退出的子进程状态,threading服务器则什么都不做"""
            pass
    
        def verify_request(self, request, client_address):
            """
            返回一个布尔值,如果该值为True ,则该请求将被处理,反之请求将被拒绝。
            此功能可以重写来实现对服务器的访问控制。
            默认的实现始终返回True。client_address可以限定客户端,比如只处理指定ip区间的请求。 常用。
            """
            return True
    
        def process_request(self, request, client_address):
            """调用finish_request.被 ForkingMixIn and ThreadingMixIn重写
            如果用户提供handle()方法抛出异常,将调用服务器的handle_error()方法。
            如果self.timeout内没有请求收到, 将调用handle_timeout()并返回handle_request()。
    
            """
            self.finish_request(request, client_address)
            self.shutdown_request(request)
    
        def server_close(self):
            """关闭并清理server."""
            pass
    
        def finish_request(self, request, client_address):
            """通过请求类的实例结束请求,实际处理RequestHandlerClass发起的请求并调用其handle()方法。 常用."""
            self.RequestHandlerClass(request, client_address, self)
    
        def shutdown_request(self, request):
            """关闭结束一个单独的请求."""
            self.close_request(request)
    
        def close_request(self, request):
            pass
    
        def handle_error(self, request, client_address):
            """优雅的操作错误,可重写,默认打印异常并继续
            """
            print '-'*40
            print 'Exception happened during processing of request from',
            print client_address
            import traceback
            traceback.print_exc() # XXX But this goes to stderr!
            print '-'*40
    

    基于协程实现socket多并发

      在这里,首先我们先了解一下协程一个概念:

    • 协程又称微线程,单线程实现异步并发

    • 线程寄存在cpu里,而协程有自己的寄存器,上下文和栈

    • 由于协程本质上是单线程,所以不存在上下文切换花销,以及锁和同步的概念

    • 低成本,高并发,唯一不足的就是不能利用cpu的核资源

    • 你就记住协程干了这么一件事:遇IO阻塞就去做别的事了(socket连接就是IO操作)

      从上面的源码解析,我们知道,socketserver实现多并发本质就是多线程或多进程,但这样还是有些低效,你想啊,如果有几万客户连接过来,就要创建几万个线程,如果用的人其实不是很多,CPU还要不断的去检测socket客户端有没有传输数据,花销很大,效率就低

    server

    import sys
    import socket
    import time
    import gevent
     
    from gevent import socket,monkey
    monkey.patch_all()
     
     
    def server(port):
        s = socket.socket()
        s.bind(('0.0.0.0', port))
        s.listen(500)
        while True:
            cli, addr = s.accept()
            gevent.spawn(handle_request, cli)
     
     
     
    def handle_request(conn):
        try:
            while True:
                data = conn.recv(1024)
                print("recv:", data)
                conn.send(data)
                if not data:
                    conn.shutdown(socket.SHUT_WR)
     
        except Exception as  ex:
            print(ex)
        finally:
            conn.close()
    if __name__ == '__main__':
        server(8001)
    

     client

    import socket
     
    HOST = 'localhost'    # The remote host
    PORT = 8001           # The same port as used by the server
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((HOST, PORT))
    while True:
        msg = bytes(input(">>:"),encoding="utf8")
        s.sendall(msg)
        data = s.recv(1024)
        #print(data)
     
        print('Received', repr(data))
    s.close()
    
  • 相关阅读:
    COM编程概述
    计算机系统
    计算机启动过程
    资源共享型智能指针实现方式
    [6] 智能指针boost::weak_ptr
    [5] 智能指针boost::shared_ptr
    [4] 智能指针boost::scoped_ptr
    函数后面加throw关键字
    [3] 智能指针std::auto_ptr
    (原创)开发使用Android studio所遇到的一些问题总结
  • 原文地址:https://www.cnblogs.com/xinsiwei18/p/5943563.html
Copyright © 2011-2022 走看看