zoukankan      html  css  js  c++  java
  • Day30--Python--struct, socketserver

    1. struct 
    struct.pack 打包
    def pack(fmt, *args): # known case of _struct.pack
        """
        pack(fmt, v1, v2, ...) -> bytes
        
        Return a bytes object containing the values v1, v2, ... packed according
        to the format string fmt.  See help(struct) for more on format strings.
        """
        return b""

    struct.unpack 解包
    def unpack(fmt, string): # known case of _struct.unpack
        """
        unpack(fmt, buffer) -> (v1, v2, ...)
        
        Return a tuple containing values unpacked according to the format string
        fmt.  The buffer's size in bytes must be calcsize(fmt). See help(struct)
        for more on format strings.
        """
        pass

    fmt 长度表






    # 粘包解决方案_3_服务端     struct.pack  打包
    
    import socket
    import struct
    import subprocess
    import time
    
    server = socket.socket()
    ip_port = ('192.168.15.87', 8001)
    server.bind(ip_port)
    server.listen(3)
    
    while 1:
        print('等待连接中...')
        conn, addr = server.accept()
        print('连接成功!')
        while 1:
            print('等待接收命令...')
            cmd = conn.recv(1024).decode('utf-8')
            if cmd == 'exit':
                break
            sub_obj = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)  # 调用控制台, stdout标准输出,返回信息 stderr 标准错误,返回错误信息
            content = sub_obj.stdout.read()  # 读取标准输出,获取到是gbk编码的bytes
            error = sub_obj.stderr.read()  # 读取标准错误, 也是gbk编码的bytes
            len_of_content = len(content)   # 获取标准输出长度
            len_packed = struct.pack('i', len_of_content)   # 'i'表示打包成4个字节长度. 此处将数据长度打包成4个字节长度的数据 
    
            len_of_err = len(error)  # 获取标准错误长度
            len_err_packed = struct.pack('i', len_of_err)    # 'i' 表示打包成4个字节长度. 此处将错误信息长度打包成4个字节长度的数据
            # print(len_packed)  # 显示的是字节 b'xafx01x00x00'
            if len_of_content == 0:   # 当标准输出长度是零,也就是返回错误信息的时候
                conn.send(len_err_packed)  # 发送打包后的错误信息的长度
                print('数据长度发送成功!')
                conn.sendall(error)      # 循环着发送错误信息数据,防止数据过大造成缓冲区溢出   # 缓冲区大小 8kb  MTU 最大传输单元 1518b, 每次发送数据最好不超过这个数
                print('数据发送成功!')
            else:
                conn.send(len_packed)    # 发送打包后的标准输出信息长度
                print('数据长度发送成功!')  # 循环着发送标准输出信息数据,防止数据过大造成缓冲区溢出
                conn.sendall(content)
                print('数据发送成功!')
        conn.close()
        print('连接已断开!')
        time.sleep(3)
    # 粘包解决方案_3_客户端        struct.unpack 解包
    
    import socket
    import struct
    
    client = socket.socket()
    serverip_port = ('192.168.15.87', 8001)
    client.connect(serverip_port)
    
    while 1:
        cmd = input('请输入命令>>>')
        client.send(cmd.encode('utf-8'))
        if cmd == 'exit':
            break
        msg_len_return = client.recv(4)    # 先接收4个字节长度的打包信息
        msg_return_unpacked = struct.unpack('i', msg_len_return)[0]   # 拆包, 获取数据长度
        # print(msg_return_unpacked)   # struct.unpack('i', msg_len_return)返回一个元组 (431,),  取[0]得到长度431
    
        total_len = 0
        total_data = b''
    
        while total_len < msg_return_unpacked:
            data_splited = client.recv(1024)     # 分段接收信息,一次最多接收1024,防止超过MTU
            total_data += data_splited       # 把接收到的数据拼接到一起
            total_len += len(data_splited)     #  计算接收到的数据总长度
    
        print(total_data.decode('gbk'))      # 接收到的信息都是gbk编码的bytes,需要进行解码
    
    client.close()








    2. FTP 简单上传

       127.0.0.1 本机回环地址
    # 服务端
    
    import socket
    import struct
    import json
    
    # 用户上传文件储存位置
    file_storage_catalogue = r"D:python_workDay030 struct, socketserver, ftp上传文件uploads"
    
    server = socket.socket()
    ip_port = ('127.0.0.1', 8001)
    server.bind(ip_port)
    server.listen()
    print('等待连接中...')
    conn, addr = server.accept()
    print('连接成功!')
    file_info_len = struct.unpack('i', conn.recv(4))[0]
    file_info = conn.recv(file_info_len).decode('utf-8')
    file_info = json.loads(file_info)
    print(file_info)
    print(type(file_info))
    
    full_file_path = file_storage_catalogue + '\' + file_info['file_name']
    total_data_size = 0
    with open(full_file_path, 'wb') as f:
        while total_data_size < file_info['file_size']:
            data_slice = conn.recv(1024)
            total_data_size += len(data_slice)
            f.write(data_slice)
    
    conn.send('文件上传成功!'.encode('utf-8'))
    conn.close()
    server.close()
    # 客户端
    
    import socket
    import json
    import struct
    import os
    
    client = socket.socket()
    server_ip_port = ('127.0.0.1', 8001)
    client.connect(server_ip_port)
    read_size = 1024
    file_info = {
        'file_path': r'D:老男孩IT课件day30 粘包解决方案2+ftp上传+socketserver视频3 socketserver模块.mp4',
        'file_name': '03 socketserver模块.mp4',
        'file_size': None
    }
    
    
    file_info['file_size'] = os.path.getsize(file_info['file_path'])
    print('上传文件大小为:%sb' % file_info['file_size'])
    
    file_info_json = json.dumps(file_info, ensure_ascii=False).encode('utf-8')
    len_of_info = len(file_info_json)
    info_len_packed = struct.pack('i', len_of_info)
    client.send(info_len_packed)
    print(file_info_json)
    client.sendall(file_info_json)
    
    
    
    # total_data = b''
    total_data_len = 0
    with open(file_info['file_path'], 'rb') as f:
        while total_data_len < file_info['file_size']:
            data_slice = f.read(read_size)
            # total_data += data_slice
            total_data_len += len(data_slice)
            client.send(data_slice)
    
    ack = client.recv(1024).decode('utf-8')
    print(ack)
    client.close()

    3. socketserver 套接字服务器

    # 服务端同时接待多个客户端
    
    import socketserver
    
    
    class MyServer(socketserver.BaseRequestHandler):
    
        def handle(self):  # 重新定义父类BaseRequestHandler中的handle方法. (约束)
            print('Connected From:', self.cliend_address)  # 获取客户端地址和端口
            while 1:
                data_received = self.request.recv(1024)  # self.request 相当于 conn 连接通道
                if data_received == b'':      """ 当连接的客户端关close时,通道仍然没有关闭,里面没有内容,会循环打印b'', 此处判断如果对面关闭了,服务端对应的通道也关闭. 
    当对面直接敲回车或者发空字符串或者空字节b''过来,而且对方的通道没有关闭时,此处是收不到任何信息的,甚至不会有空字节b''过来,因为字节为空就是没有东西,没传东西就什么也过不来
    """
           self.request.close()
    break print('妲己宝宝说:%s' % data_received.decode('utf-8')) reply = input('亚瑟说>>>') self.request.send(reply.encode('utf-8')) if __name__ == '__main__': ip_port = ('127.0.0.1', 8001) socketserver.TCPServer.allow_reuse_address = True socketserver.TCPServer.request_queue_size = 10 # 设置监听数量,默认是5,可修改 server = socketserver.ThreadingTCPServer(ip_port, MyServer) # 绑定IP地址和端口,并启动刚才定义的MyServer类 server.serve_forever() # 永久的执行下去, 相当于accept()一直等待连接 print('这里是测试')
    # 客户端
    
    import socket
    
    client = socket.socket()
    server_ip_port = ('127.0.0.1', 8001)
    client.connect(server_ip_port)
    while 1:
        msg = input('发送消息, 按Q退出>>>')
        if msg.upper() == 'Q':
            break
        elif msg == '':     # 此处需要判断,如果输入的是空字符串'',空字节b'',或者直接敲回车,通道内不会存在任何数据,服务端不会收到任何消息,也就没法回复消息,客户端接不到回复,
                       程序会阻塞住
    print('内容不能为空!') continue client.send(msg.encode('utf-8')) reply = client.recv(1024).decode('utf-8') print('亚瑟说: %s' % reply) client.close()
    class TCPServer(BaseServer) 中的类变量,可以修改
    request_queue_size = 5
    allow_reuse_address = False
    >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    ThreadingTCPServer(ip_port, MyServer)  # 要找__init__
    >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass   # 自己类中没有__init__,根据MRO顺序找父类
    >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    class TCPServer(BaseServer):   # ThreadingMixIn 类中没有,继续在TCPServer中找到__init__
        def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):  # 找到了ip_port和MyServer对应的形参
            """Constructor.  May be extended, do not override."""
            BaseServer.__init__(self, server_address, RequestHandlerClass)  # 里面还有__init__, 找到了对应的形参
            self.socket = socket.socket(self.address_family,
                                        self.socket_type)
            if bind_and_activate:
                try:
                    self.server_bind()                                      # 调用server_bind()方法
                    self.server_activate()
                except:
                    self.server_close()
                    raise
    
        def server_bind(self):
            """Called by constructor to bind the socket.
    
            May be overridden.
    
            """
            if self.allow_reuse_address:                                     # allow_reuse_address = True的条件下
                self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.socket.bind(self.server_address)                            # 绑定IP地址
            self.server_address = self.socket.getsockname()
    
        def server_activate(self):
            """Called by constructor to activate the server.
    
            May be overridden.
    
            """
            self.socket.listen(self.request_queue_size)                      # 监听. request_queue_size 类变量给了值,可修改
    
        def server_close(self):
            """Called to clean-up the server.
    
            May be overridden.
    
            """
            self.socket.close()                                             # 关闭
    >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    
    serve_forever() ==> _handle_request_noblock(self)
    
    class BaseServer:
    def _handle_request_noblock(self):
        """Handle one request, without blocking.
    
        I assume that selector.select() has returned that the socket is
        readable before this function was called, so there should be no risk of
        blocking in get_request().
        """
        try:
            request, client_address = self.get_request()                # 同conn, addr   此处get_request()相当于accept()
    
    >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    class BaseRequestHandler:                                           # 重新 定义 handle方法,调用handle方法
        def handle(self):
            pass
    View 梳理流程
    class BaseRequestHandler:
    
        """Base class for request handler classes.
    
        This class is instantiated for each request to be handled.  The
        constructor sets the instance variables request, client_address
        and server, and then calls the handle() method.  To implement a
        specific service, all you need to do is to derive a class which
        defines a handle() method.
    
        The handle() method can find the request as self.request, the
        client address as self.client_address, and the server (in case it
        needs access to per-server information) as self.server.  Since a
        separate instance is created for each request, the handle() method
        can define other arbitrary instance variables.
    
        """
    
        def __init__(self, request, client_address, server):
            self.request = request
            self.client_address = client_address
            self.server = server
            self.setup()
            try:
                self.handle()
            finally:
                self.finish()
    
        def setup(self):
            pass
    
        def handle(self):
            pass
    
        def finish(self):
            pass
    View class BaseRequestHandler
    class ThreadingMixIn:
        """Mix-in class to handle each request in a new thread."""
    
        # Decides how threads will act upon termination of the
        # main process
        daemon_threads = False
    
        def process_request_thread(self, request, client_address):
            """Same as in BaseServer but as a thread.
    
            In addition, exception handling is done here.
    
            """
            try:
                self.finish_request(request, client_address)
            except Exception:
                self.handle_error(request, client_address)
            finally:
                self.shutdown_request(request)
    
        def process_request(self, request, client_address):
            """Start a new thread to process the request."""
            t = threading.Thread(target = self.process_request_thread,
                                 args = (request, client_address))
            t.daemon = self.daemon_threads
            t.start()
    View class ThreadingMixIn
    class TCPServer(BaseServer):
    
        """Base class for various socket-based server classes.
    
        Defaults to synchronous IP stream (i.e., TCP).
    
        Methods for the caller:
    
        - __init__(server_address, RequestHandlerClass, bind_and_activate=True)
        - serve_forever(poll_interval=0.5)
        - shutdown()
        - handle_request()  # if you don't use serve_forever()
        - fileno() -> int   # for selector
    
        Methods that may be overridden:
    
        - server_bind()
        - server_activate()
        - get_request() -> request, client_address
        - handle_timeout()
        - verify_request(request, client_address)
        - process_request(request, client_address)
        - shutdown_request(request)
        - close_request(request)
        - handle_error()
    
        Methods for derived classes:
    
        - finish_request(request, client_address)
    
        Class variables that may be overridden by derived classes or
        instances:
    
        - timeout
        - address_family
        - socket_type
        - request_queue_size (only for stream sockets)
        - allow_reuse_address
    
        Instance variables:
    
        - server_address
        - RequestHandlerClass
        - socket
    
        """
    
        address_family = socket.AF_INET
    
        socket_type = socket.SOCK_STREAM
    
        request_queue_size = 5
    
        allow_reuse_address = False
    
        def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
            """Constructor.  May be extended, do not override."""
            BaseServer.__init__(self, server_address, RequestHandlerClass)
            self.socket = socket.socket(self.address_family,
                                        self.socket_type)
            if bind_and_activate:
                try:
                    self.server_bind()
                    self.server_activate()
                except:
                    self.server_close()
                    raise
    
        def server_bind(self):
            """Called by constructor to bind the socket.
    
            May be overridden.
    
            """
            if self.allow_reuse_address:
                self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.socket.bind(self.server_address)
            self.server_address = self.socket.getsockname()
    
        def server_activate(self):
            """Called by constructor to activate the server.
    
            May be overridden.
    
            """
            self.socket.listen(self.request_queue_size)
    
        def server_close(self):
            """Called to clean-up the server.
    
            May be overridden.
    
            """
            self.socket.close()
    
        def fileno(self):
            """Return socket file number.
    
            Interface required by selector.
    
            """
            return self.socket.fileno()
    
        def get_request(self):
            """Get the request and client address from the socket.
    
            May be overridden.
    
            """
            return self.socket.accept()
    
        def shutdown_request(self, request):
            """Called to shutdown and close an individual request."""
            try:
                #explicitly shutdown.  socket.close() merely releases
                #the socket and waits for GC to perform the actual close.
                request.shutdown(socket.SHUT_WR)
            except OSError:
                pass #some platforms may raise ENOTCONN here
            self.close_request(request)
    
        def close_request(self, request):
            """Called to clean up an individual request."""
            request.close()
    View TCPServer
    class BaseServer:
    
        """Base class for server classes.
    
        Methods for the caller:
    
        - __init__(server_address, RequestHandlerClass)
        - serve_forever(poll_interval=0.5)
        - shutdown()
        - handle_request()  # if you do not use serve_forever()
        - fileno() -> int   # for selector
    
        Methods that may be overridden:
    
        - server_bind()
        - server_activate()
        - get_request() -> request, client_address
        - handle_timeout()
        - verify_request(request, client_address)
        - server_close()
        - process_request(request, client_address)
        - shutdown_request(request)
        - close_request(request)
        - service_actions()
        - handle_error()
    
        Methods for derived classes:
    
        - finish_request(request, client_address)
    
        Class variables that may be overridden by derived classes or
        instances:
    
        - timeout
        - address_family
        - socket_type
        - allow_reuse_address
    
        Instance variables:
    
        - RequestHandlerClass
        - socket
    
        """
    
        timeout = None
    
        def __init__(self, server_address, RequestHandlerClass):
            """Constructor.  May be extended, do not override."""
            self.server_address = server_address
            self.RequestHandlerClass = RequestHandlerClass
            self.__is_shut_down = threading.Event()
            self.__shutdown_request = False
    
        def server_activate(self):
            """Called by constructor to activate the server.
    
            May be overridden.
    
            """
            pass
    
        def serve_forever(self, poll_interval=0.5):
            """Handle one request at a time until shutdown.
    
            Polls for shutdown every poll_interval seconds. Ignores
            self.timeout. If you need to do periodic tasks, do them in
            another thread.
            """
            self.__is_shut_down.clear()
            try:
                # XXX: Consider using another file descriptor or connecting to the
                # socket to wake this up instead of polling. Polling reduces our
                # responsiveness to a shutdown request and wastes cpu at all other
                # times.
                with _ServerSelector() as selector:
                    selector.register(self, selectors.EVENT_READ)
    
                    while not self.__shutdown_request:
                        ready = selector.select(poll_interval)
                        if ready:
                            self._handle_request_noblock()
    
                        self.service_actions()
            finally:
                self.__shutdown_request = False
                self.__is_shut_down.set()
    
        def shutdown(self):
            """Stops the serve_forever loop.
    
            Blocks until the loop has finished. This must be called while
            serve_forever() is running in another thread, or it will
            deadlock.
            """
            self.__shutdown_request = True
            self.__is_shut_down.wait()
    
        def service_actions(self):
            """Called by the serve_forever() loop.
    
            May be overridden by a subclass / Mixin to implement any code that
            needs to be run during the loop.
            """
            pass
    
        # The distinction between handling, getting, processing and finishing a
        # request is fairly arbitrary.  Remember:
        #
        # - handle_request() is the top-level call.  It calls selector.select(),
        #   get_request(), verify_request() and process_request()
        # - get_request() is different for stream or datagram sockets
        # - process_request() is the place that may fork a new process or create a
        #   new thread to finish the request
        # - finish_request() instantiates the request handler class; this
        #   constructor will handle the request all by itself
    
        def handle_request(self):
            """Handle one request, possibly blocking.
    
            Respects self.timeout.
            """
            # Support people who used socket.settimeout() to escape
            # handle_request before self.timeout was available.
            timeout = self.socket.gettimeout()
            if timeout is None:
                timeout = self.timeout
            elif self.timeout is not None:
                timeout = min(timeout, self.timeout)
            if timeout is not None:
                deadline = time() + timeout
    
            # Wait until a request arrives or the timeout expires - the loop is
            # necessary to accommodate early wakeups due to EINTR.
            with _ServerSelector() as selector:
                selector.register(self, selectors.EVENT_READ)
    
                while True:
                    ready = selector.select(timeout)
                    if ready:
                        return self._handle_request_noblock()
                    else:
                        if timeout is not None:
                            timeout = deadline - time()
                            if timeout < 0:
                                return self.handle_timeout()
    
        def _handle_request_noblock(self):
            """Handle one request, without blocking.
    
            I assume that selector.select() has returned that the socket is
            readable before this function was called, so there should be no risk of
            blocking in get_request().
            """
            try:
                request, client_address = self.get_request()
            except OSError:
                return
            if self.verify_request(request, client_address):
                try:
                    self.process_request(request, client_address)
                except Exception:
                    self.handle_error(request, client_address)
                    self.shutdown_request(request)
                except:
                    self.shutdown_request(request)
                    raise
            else:
                self.shutdown_request(request)
    
        def handle_timeout(self):
            """Called if no new request arrives within self.timeout.
    
            Overridden by ForkingMixIn.
            """
            pass
    
        def verify_request(self, request, client_address):
            """Verify the request.  May be overridden.
    
            Return True if we should proceed with this request.
    
            """
            return True
    
        def process_request(self, request, client_address):
            """Call finish_request.
    
            Overridden by ForkingMixIn and ThreadingMixIn.
    
            """
            self.finish_request(request, client_address)
            self.shutdown_request(request)
    
        def server_close(self):
            """Called to clean-up the server.
    
            May be overridden.
    
            """
            pass
    
        def finish_request(self, request, client_address):
            """Finish one request by instantiating RequestHandlerClass."""
            self.RequestHandlerClass(request, client_address, self)
    
        def shutdown_request(self, request):
            """Called to shutdown and close an individual request."""
            self.close_request(request)
    
        def close_request(self, request):
            """Called to clean up an individual request."""
            pass
    
        def handle_error(self, request, client_address):
            """Handle an error gracefully.  May be overridden.
    
            The default is to print a traceback and continue.
    
            """
            print('-'*40, file=sys.stderr)
            print('Exception happened during processing of request from',
                client_address, file=sys.stderr)
            import traceback
            traceback.print_exc()
            print('-'*40, file=sys.stderr)
    
        def __enter__(self):
            return self
    
        def __exit__(self, *args):
            self.server_close()
    View BaseServer
  • 相关阅读:
    [速记]关于指针,引用和递归和解递归——C++
    查找(二)——基于二叉排序树的查找
    查找总结(一)-----简单查找和折半查找
    jdk代理和cglib代理
    IOC和AOP使用扩展 多种方式实现依赖注入
    InputStream和Reader
    Spring IoC
    Spring AOP(aspect oriented programming) 转载
    数据校验与国际化
    Struts2 之 实现文件上传(多文件)和下载
  • 原文地址:https://www.cnblogs.com/surasun/p/9811285.html
Copyright © 2011-2022 走看看