1. struct
struct.pack 打包
def pack(fmt, *args): # known case of _struct.pack """ pack(fmt, v1, v2, ...) -> bytes Return a bytes object containing the values v1, v2, ... packed according to the format string fmt. See help(struct) for more on format strings. """ return b""
struct.unpack 解包
def unpack(fmt, string): # known case of _struct.unpack """ unpack(fmt, buffer) -> (v1, v2, ...) Return a tuple containing values unpacked according to the format string fmt. The buffer's size in bytes must be calcsize(fmt). See help(struct) for more on format strings. """ pass
fmt 长度表
# 粘包解决方案_3_服务端 struct.pack 打包 import socket import struct import subprocess import time server = socket.socket() ip_port = ('192.168.15.87', 8001) server.bind(ip_port) server.listen(3) while 1: print('等待连接中...') conn, addr = server.accept() print('连接成功!') while 1: print('等待接收命令...') cmd = conn.recv(1024).decode('utf-8') if cmd == 'exit': break sub_obj = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # 调用控制台, stdout标准输出,返回信息 stderr 标准错误,返回错误信息 content = sub_obj.stdout.read() # 读取标准输出,获取到是gbk编码的bytes error = sub_obj.stderr.read() # 读取标准错误, 也是gbk编码的bytes len_of_content = len(content) # 获取标准输出长度 len_packed = struct.pack('i', len_of_content) # 'i'表示打包成4个字节长度. 此处将数据长度打包成4个字节长度的数据 len_of_err = len(error) # 获取标准错误长度 len_err_packed = struct.pack('i', len_of_err) # 'i' 表示打包成4个字节长度. 此处将错误信息长度打包成4个字节长度的数据 # print(len_packed) # 显示的是字节 b'xafx01x00x00' if len_of_content == 0: # 当标准输出长度是零,也就是返回错误信息的时候 conn.send(len_err_packed) # 发送打包后的错误信息的长度 print('数据长度发送成功!') conn.sendall(error) # 循环着发送错误信息数据,防止数据过大造成缓冲区溢出 # 缓冲区大小 8kb MTU 最大传输单元 1518b, 每次发送数据最好不超过这个数 print('数据发送成功!') else: conn.send(len_packed) # 发送打包后的标准输出信息长度 print('数据长度发送成功!') # 循环着发送标准输出信息数据,防止数据过大造成缓冲区溢出 conn.sendall(content) print('数据发送成功!') conn.close() print('连接已断开!') time.sleep(3)
# 粘包解决方案_3_客户端 struct.unpack 解包 import socket import struct client = socket.socket() serverip_port = ('192.168.15.87', 8001) client.connect(serverip_port) while 1: cmd = input('请输入命令>>>') client.send(cmd.encode('utf-8')) if cmd == 'exit': break msg_len_return = client.recv(4) # 先接收4个字节长度的打包信息 msg_return_unpacked = struct.unpack('i', msg_len_return)[0] # 拆包, 获取数据长度 # print(msg_return_unpacked) # struct.unpack('i', msg_len_return)返回一个元组 (431,), 取[0]得到长度431 total_len = 0 total_data = b'' while total_len < msg_return_unpacked: data_splited = client.recv(1024) # 分段接收信息,一次最多接收1024,防止超过MTU total_data += data_splited # 把接收到的数据拼接到一起 total_len += len(data_splited) # 计算接收到的数据总长度 print(total_data.decode('gbk')) # 接收到的信息都是gbk编码的bytes,需要进行解码 client.close()
2. FTP 简单上传
127.0.0.1 本机回环地址
# 服务端 import socket import struct import json # 用户上传文件储存位置 file_storage_catalogue = r"D:python_workDay030 struct, socketserver, ftp上传文件uploads" server = socket.socket() ip_port = ('127.0.0.1', 8001) server.bind(ip_port) server.listen() print('等待连接中...') conn, addr = server.accept() print('连接成功!') file_info_len = struct.unpack('i', conn.recv(4))[0] file_info = conn.recv(file_info_len).decode('utf-8') file_info = json.loads(file_info) print(file_info) print(type(file_info)) full_file_path = file_storage_catalogue + '\' + file_info['file_name'] total_data_size = 0 with open(full_file_path, 'wb') as f: while total_data_size < file_info['file_size']: data_slice = conn.recv(1024) total_data_size += len(data_slice) f.write(data_slice) conn.send('文件上传成功!'.encode('utf-8')) conn.close() server.close()
# 客户端 import socket import json import struct import os client = socket.socket() server_ip_port = ('127.0.0.1', 8001) client.connect(server_ip_port) read_size = 1024 file_info = { 'file_path': r'D:老男孩IT课件day30 粘包解决方案2+ftp上传+socketserver视频 3 socketserver模块.mp4', 'file_name': '03 socketserver模块.mp4', 'file_size': None } file_info['file_size'] = os.path.getsize(file_info['file_path']) print('上传文件大小为:%sb' % file_info['file_size']) file_info_json = json.dumps(file_info, ensure_ascii=False).encode('utf-8') len_of_info = len(file_info_json) info_len_packed = struct.pack('i', len_of_info) client.send(info_len_packed) print(file_info_json) client.sendall(file_info_json) # total_data = b'' total_data_len = 0 with open(file_info['file_path'], 'rb') as f: while total_data_len < file_info['file_size']: data_slice = f.read(read_size) # total_data += data_slice total_data_len += len(data_slice) client.send(data_slice) ack = client.recv(1024).decode('utf-8') print(ack) client.close()
3. socketserver 套接字服务器
# 服务端同时接待多个客户端 import socketserver class MyServer(socketserver.BaseRequestHandler): def handle(self): # 重新定义父类BaseRequestHandler中的handle方法. (约束) print('Connected From:', self.cliend_address) # 获取客户端地址和端口 while 1: data_received = self.request.recv(1024) # self.request 相当于 conn 连接通道 if data_received == b'': """ 当连接的客户端关close时,通道仍然没有关闭,里面没有内容,会循环打印b'', 此处判断如果对面关闭了,服务端对应的通道也关闭.
当对面直接敲回车或者发空字符串或者空字节b''过来,而且对方的通道没有关闭时,此处是收不到任何信息的,甚至不会有空字节b''过来,因为字节为空就是没有东西,没传东西就什么也过不来"""
self.request.close() break print('妲己宝宝说:%s' % data_received.decode('utf-8')) reply = input('亚瑟说>>>') self.request.send(reply.encode('utf-8')) if __name__ == '__main__': ip_port = ('127.0.0.1', 8001) socketserver.TCPServer.allow_reuse_address = True socketserver.TCPServer.request_queue_size = 10 # 设置监听数量,默认是5,可修改 server = socketserver.ThreadingTCPServer(ip_port, MyServer) # 绑定IP地址和端口,并启动刚才定义的MyServer类 server.serve_forever() # 永久的执行下去, 相当于accept()一直等待连接 print('这里是测试')
# 客户端 import socket client = socket.socket() server_ip_port = ('127.0.0.1', 8001) client.connect(server_ip_port) while 1: msg = input('发送消息, 按Q退出>>>') if msg.upper() == 'Q': break elif msg == '': # 此处需要判断,如果输入的是空字符串'',空字节b'',或者直接敲回车,通道内不会存在任何数据,服务端不会收到任何消息,也就没法回复消息,客户端接不到回复,
程序会阻塞住 print('内容不能为空!') continue client.send(msg.encode('utf-8')) reply = client.recv(1024).decode('utf-8') print('亚瑟说: %s' % reply) client.close()

class TCPServer(BaseServer) 中的类变量,可以修改 request_queue_size = 5 allow_reuse_address = False >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ThreadingTCPServer(ip_port, MyServer) # 要找__init__ >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass # 自己类中没有__init__,根据MRO顺序找父类 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class TCPServer(BaseServer): # ThreadingMixIn 类中没有,继续在TCPServer中找到__init__ def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): # 找到了ip_port和MyServer对应的形参 """Constructor. May be extended, do not override.""" BaseServer.__init__(self, server_address, RequestHandlerClass) # 里面还有__init__, 找到了对应的形参 self.socket = socket.socket(self.address_family, self.socket_type) if bind_and_activate: try: self.server_bind() # 调用server_bind()方法 self.server_activate() except: self.server_close() raise def server_bind(self): """Called by constructor to bind the socket. May be overridden. """ if self.allow_reuse_address: # allow_reuse_address = True的条件下 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(self.server_address) # 绑定IP地址 self.server_address = self.socket.getsockname() def server_activate(self): """Called by constructor to activate the server. May be overridden. """ self.socket.listen(self.request_queue_size) # 监听. request_queue_size 类变量给了值,可修改 def server_close(self): """Called to clean-up the server. May be overridden. """ self.socket.close() # 关闭 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> serve_forever() ==> _handle_request_noblock(self) class BaseServer: def _handle_request_noblock(self): """Handle one request, without blocking. I assume that selector.select() has returned that the socket is readable before this function was called, so there should be no risk of blocking in get_request(). """ try: request, client_address = self.get_request() # 同conn, addr 此处get_request()相当于accept() >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class BaseRequestHandler: # 重新 定义 handle方法,调用handle方法 def handle(self): pass

class BaseRequestHandler: """Base class for request handler classes. This class is instantiated for each request to be handled. The constructor sets the instance variables request, client_address and server, and then calls the handle() method. To implement a specific service, all you need to do is to derive a class which defines a handle() method. The handle() method can find the request as self.request, the client address as self.client_address, and the server (in case it needs access to per-server information) as self.server. Since a separate instance is created for each request, the handle() method can define other arbitrary instance variables. """ def __init__(self, request, client_address, server): self.request = request self.client_address = client_address self.server = server self.setup() try: self.handle() finally: self.finish() def setup(self): pass def handle(self): pass def finish(self): pass

class ThreadingMixIn: """Mix-in class to handle each request in a new thread.""" # Decides how threads will act upon termination of the # main process daemon_threads = False def process_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. In addition, exception handling is done here. """ try: self.finish_request(request, client_address) except Exception: self.handle_error(request, client_address) finally: self.shutdown_request(request) def process_request(self, request, client_address): """Start a new thread to process the request.""" t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads t.start()

class TCPServer(BaseServer): """Base class for various socket-based server classes. Defaults to synchronous IP stream (i.e., TCP). Methods for the caller: - __init__(server_address, RequestHandlerClass, bind_and_activate=True) - serve_forever(poll_interval=0.5) - shutdown() - handle_request() # if you don't use serve_forever() - fileno() -> int # for selector Methods that may be overridden: - server_bind() - server_activate() - get_request() -> request, client_address - handle_timeout() - verify_request(request, client_address) - process_request(request, client_address) - shutdown_request(request) - close_request(request) - handle_error() Methods for derived classes: - finish_request(request, client_address) Class variables that may be overridden by derived classes or instances: - timeout - address_family - socket_type - request_queue_size (only for stream sockets) - allow_reuse_address Instance variables: - server_address - RequestHandlerClass - socket """ address_family = socket.AF_INET socket_type = socket.SOCK_STREAM request_queue_size = 5 allow_reuse_address = False def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): """Constructor. May be extended, do not override.""" BaseServer.__init__(self, server_address, RequestHandlerClass) self.socket = socket.socket(self.address_family, self.socket_type) if bind_and_activate: try: self.server_bind() self.server_activate() except: self.server_close() raise def server_bind(self): """Called by constructor to bind the socket. May be overridden. """ if self.allow_reuse_address: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(self.server_address) self.server_address = self.socket.getsockname() def server_activate(self): """Called by constructor to activate the server. May be overridden. """ self.socket.listen(self.request_queue_size) def server_close(self): """Called to clean-up the server. May be overridden. """ self.socket.close() def fileno(self): """Return socket file number. Interface required by selector. """ return self.socket.fileno() def get_request(self): """Get the request and client address from the socket. May be overridden. """ return self.socket.accept() def shutdown_request(self, request): """Called to shutdown and close an individual request.""" try: #explicitly shutdown. socket.close() merely releases #the socket and waits for GC to perform the actual close. request.shutdown(socket.SHUT_WR) except OSError: pass #some platforms may raise ENOTCONN here self.close_request(request) def close_request(self, request): """Called to clean up an individual request.""" request.close()

class BaseServer: """Base class for server classes. Methods for the caller: - __init__(server_address, RequestHandlerClass) - serve_forever(poll_interval=0.5) - shutdown() - handle_request() # if you do not use serve_forever() - fileno() -> int # for selector Methods that may be overridden: - server_bind() - server_activate() - get_request() -> request, client_address - handle_timeout() - verify_request(request, client_address) - server_close() - process_request(request, client_address) - shutdown_request(request) - close_request(request) - service_actions() - handle_error() Methods for derived classes: - finish_request(request, client_address) Class variables that may be overridden by derived classes or instances: - timeout - address_family - socket_type - allow_reuse_address Instance variables: - RequestHandlerClass - socket """ timeout = None def __init__(self, server_address, RequestHandlerClass): """Constructor. May be extended, do not override.""" self.server_address = server_address self.RequestHandlerClass = RequestHandlerClass self.__is_shut_down = threading.Event() self.__shutdown_request = False def server_activate(self): """Called by constructor to activate the server. May be overridden. """ pass def serve_forever(self, poll_interval=0.5): """Handle one request at a time until shutdown. Polls for shutdown every poll_interval seconds. Ignores self.timeout. If you need to do periodic tasks, do them in another thread. """ self.__is_shut_down.clear() try: # XXX: Consider using another file descriptor or connecting to the # socket to wake this up instead of polling. Polling reduces our # responsiveness to a shutdown request and wastes cpu at all other # times. with _ServerSelector() as selector: selector.register(self, selectors.EVENT_READ) while not self.__shutdown_request: ready = selector.select(poll_interval) if ready: self._handle_request_noblock() self.service_actions() finally: self.__shutdown_request = False self.__is_shut_down.set() def shutdown(self): """Stops the serve_forever loop. Blocks until the loop has finished. This must be called while serve_forever() is running in another thread, or it will deadlock. """ self.__shutdown_request = True self.__is_shut_down.wait() def service_actions(self): """Called by the serve_forever() loop. May be overridden by a subclass / Mixin to implement any code that needs to be run during the loop. """ pass # The distinction between handling, getting, processing and finishing a # request is fairly arbitrary. Remember: # # - handle_request() is the top-level call. It calls selector.select(), # get_request(), verify_request() and process_request() # - get_request() is different for stream or datagram sockets # - process_request() is the place that may fork a new process or create a # new thread to finish the request # - finish_request() instantiates the request handler class; this # constructor will handle the request all by itself def handle_request(self): """Handle one request, possibly blocking. Respects self.timeout. """ # Support people who used socket.settimeout() to escape # handle_request before self.timeout was available. timeout = self.socket.gettimeout() if timeout is None: timeout = self.timeout elif self.timeout is not None: timeout = min(timeout, self.timeout) if timeout is not None: deadline = time() + timeout # Wait until a request arrives or the timeout expires - the loop is # necessary to accommodate early wakeups due to EINTR. with _ServerSelector() as selector: selector.register(self, selectors.EVENT_READ) while True: ready = selector.select(timeout) if ready: return self._handle_request_noblock() else: if timeout is not None: timeout = deadline - time() if timeout < 0: return self.handle_timeout() def _handle_request_noblock(self): """Handle one request, without blocking. I assume that selector.select() has returned that the socket is readable before this function was called, so there should be no risk of blocking in get_request(). """ try: request, client_address = self.get_request() except OSError: return if self.verify_request(request, client_address): try: self.process_request(request, client_address) except Exception: self.handle_error(request, client_address) self.shutdown_request(request) except: self.shutdown_request(request) raise else: self.shutdown_request(request) def handle_timeout(self): """Called if no new request arrives within self.timeout. Overridden by ForkingMixIn. """ pass def verify_request(self, request, client_address): """Verify the request. May be overridden. Return True if we should proceed with this request. """ return True def process_request(self, request, client_address): """Call finish_request. Overridden by ForkingMixIn and ThreadingMixIn. """ self.finish_request(request, client_address) self.shutdown_request(request) def server_close(self): """Called to clean-up the server. May be overridden. """ pass def finish_request(self, request, client_address): """Finish one request by instantiating RequestHandlerClass.""" self.RequestHandlerClass(request, client_address, self) def shutdown_request(self, request): """Called to shutdown and close an individual request.""" self.close_request(request) def close_request(self, request): """Called to clean up an individual request.""" pass def handle_error(self, request, client_address): """Handle an error gracefully. May be overridden. The default is to print a traceback and continue. """ print('-'*40, file=sys.stderr) print('Exception happened during processing of request from', client_address, file=sys.stderr) import traceback traceback.print_exc() print('-'*40, file=sys.stderr) def __enter__(self): return self def __exit__(self, *args): self.server_close()