前言
在本章节中,我们将探讨TCP协议基于流式传输的最大一个问题,即粘包问题。本章主要介绍TCP粘包的原理与其三种解决粘包的方案。并且还会介绍为什么UDP协议不会产生粘包。
基于TCP协议的socket实现远程命令输入
我们准备做一个可以在Client端远程执行Server端shell
命令并拿到其执行结果的程序,而涉及到网络通信就必然会出现socket
模块,关于如何抉择传输层协议的选择?我们选择使用TCP协议,因为它是可靠传输协议且数据量支持比UDP协议要大
Server端代码如下:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # ==== 基于TCP协议的socket实现远程命令输入之Server ==== import subprocess from socket import * server = socket(AF_INET, SOCK_STREAM) server.bind(("0.0.0.0",6666)) # 放在远程填入0.0.0.0,放在本地填入127.0.0.1 server.listen(5) while 1: # 链接循环 conn,client_addr = server.accept() while 1: # 通信循环 try: # 防止Windows平台下Client端异常关闭导致双向链接崩塌Server端异常的情况发生 cmd = conn.recv(1024) if not cmd: # 防止类Unix平台下Client端异常关闭导致双向链接崩塌Server端异常的情况发生 break res = subprocess.Popen(cmd.decode("utf-8"), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,) stdout_res = res.stdout.read() # 正确结果 stderr_res = res.stderr.read() # 错误结果 # subprocess模块拿到的是bytes类型,所以直接发送即可 cmd_res = stdout_res if stdout_res else stderr_res # 因为两个结果只有一个有信息,所以我们只拿到有结果的那个 conn.send(cmd_res) except Exception: break conn.close() # 由于client端链接异常,故关闭链接循环
Client端代码如下:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # ==== 基于TCP协议的socket实现远程命令输入之Client ==== from socket import * client = socket(AF_INET,SOCK_STREAM) client.connect(("xxx.xxx.xxx.xxx",6666)) # 填入Server端公网IP while 1: cmd = input("请输入命令>>>:").strip() if not cmd: continue if cmd == "quit": break client.send(cmd.encode("utf-8")) cmd_res = client.recv(1024) # 本次接收1024字节数据 print(cmd_res.decode("utf-8")) # 如果Server端是Windows则用gbk解码,类Unix用utf-8解码 client.close()
测试结果:
粘包问题及其原理
上面的测试一切看起来都非常完美,但是是有一个BUG的。当我们如果读取一条非常长的命令实际上是会出问题的,比如:
这种现象被称之为粘包,那么为何会产生这样的现象呢?
这是由于
recv()
没有一次性读取完整个内核缓冲区的内容导致的。其实归根结底还是怪TCP是字节流方式传输数据。
我们来解析一下这种现象产生的原因:
由于我们的recv()
只是按照固定的1024去读取数据,那么一旦整体内核缓冲区中所存储的整体数据大于1024,就会产生粘包现象。所谓粘包问题主要还是因为接收方不知道消息之间的界限,不知道一次性提取多少字节的数据所造成的。
这里我还画了一幅图,可以方便读者理解:
那么我们可以通过不断的增大recv()
中的读取范围来解决这个问题吗?就像对应上图中的,一次性把快递柜包裹全取完,答案是不可以!你再大你也不可能大过内核缓冲区,这个东西都是有一个一定的阈值。一旦超出了这个阈值就会引发异常或者干脆无效。那么有什么好的办法呢?哈,下面会教给你一些解决办法的。不过在此之前我们要先看一个TCP协议特有的Nagle
算法。
Nagle算法与粘包
基于TCP协议的socket通信有一个特点,即:一方的
send()
与另一方的recv()
可以没有任何关系,即:一方send()
三次,另一方recv()
一次就可以将数据全部取出来。
TCP协议的发送方有一个特征。他会进行组包,如果一次发送的数据量很小,比如第一次发送10个字节,第二次发生2个字节,第三次发生3个字节。他可能会将这15个字节凑到一块发送出去,这是采用了Nagle
算法来进行的,这么做有一个弊端就是接收方想要将这个大的数据包按照发送方的发送次数精确无误的接收拆分成10 2 3必须要有发送方提供的拆包机制才行。
如下图组所示
发送方:
from socket import * ip_port = ("127.0.0.1",12306) buffer_size = 1024 back_log = 5 server = socket(AF_INET,SOCK_STREAM) server.bind(ip_port) server.listen(back_log) conn,addr = server.accept() conn.send("hello,".encode("utf-8")) # 第一次发送是6Bytes的数据 conn.send("world,".encode("utf-8")) # 第二次也是6Bytes的数据 conn.send("yunyaGG!!".encode("utf-8")) # 第三次是9Bytes的数据
接收方:
from socket import * ip_port = ("127.0.0.1",12306) buffer_size = 1024 client = socket(AF_INET,SOCK_STREAM) client.connect(ip_port) data_1 = client.recv(buffer_size) # 我们读取数据时统一用设定的 buffer_size 来读取 print("这是第一次的数据包:",data_1.decode("utf-8")) data_2 = client.recv(buffer_size) print("这是第二次的数据包:",data_2.decode("utf-8")) data_3 = client.recv(buffer_size) print("这是第三次的数据包:",data_3.decode("utf-8"))
接收结果:
# ==== 执行结果 ==== """ 这是第一次的数据包: hello, 这是第二次的数据包: world,yunyaGG!! 这是第三次的数据包: """
和预想的有点不太一样哈,居然把第二次和第三次组成了一个大的数据包发送过来了。这就是Nagle
算法,这样的组包策略很容易就会产生粘包。我不知道你是以什么样的方式发过来的,所以我recv()
就只能按照自己设定的方式去接收。
现在思考一下粘包的思路,我们的发送方需要将切分解包的规则告诉给接收方。
我们尝试改一下每一次的buffer_size
接收大小:
接收方:
from socket import * ip_port = ("127.0.0.1",12306) buffer_size = 1024 client = socket(AF_INET,SOCK_STREAM) client.connect(ip_port) data_1 = client.recv(6) # 我们手动的按照对方发送时的规则来进行拆包 print("这是第一次的数据包:",data_1.decode("utf-8")) data_2 = client.recv(6) print("这是第二次的数据包:",data_2.decode("utf-8")) data_3 = client.recv(9) print("这是第三次的数据包:",data_3.decode("utf-8"))
接收结果:
# ==== 执行结果 ==== """ 这是第一次的数据包: hello, 这是第二次的数据包: world, 这是第三次的数据包: yunyaGG!! """
粘包被我们手动的计算字节数来精确的分割数据接受量的大小给解决了,但是这样做是不现实的..我们不可能知道对方发送的数据到底是怎么样的,更不用说手动计算。所以有没有更好的解决方案呢?
解决方案1:预先发送消息长度
好了,其实上面关于解决粘包的思路已经出来了。我们需要做的就是让接收方知道本次发送内容的大小,接收方才能够精确的将所有数据全部提取出来不产生遗漏。其实实现方式很简单,可以尝试以下思路:
1.发送方发送一个此次数据固定的长度
2.接收方接收到该数据长度并且回应
3.发送方收到回应并且发送真正的数据
4.接收方不断的用默认的
buffer_size
值接收新的数据并存储起来直到超出整个数据的长度,代表此处数据全部接收完毕
Server端:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # ==== 基于TCP协议的socket实现远程命令输入之Server ==== import subprocess from socket import * server = socket(AF_INET, SOCK_STREAM) server.bind(("0.0.0.0", 6666)) # 放在远程填入0.0.0.0 放在本地测试填入127.0.0.1 server.listen(5) while 1: # 链接循环 conn, client_addr = server.accept() while 1: # 通信循环 try: # 防止Windows平台下Client端异常关闭导致双向链接崩塌Server端异常的情况发生 cmd = conn.recv(1024) if not cmd: # 防止类Unix平台下Client端异常关闭导致双向链接崩塌Server端异常的情况发生 break res = subprocess.Popen(cmd.decode("utf-8"), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout_res = res.stdout.read() # 正确结果 stderr_res = res.stderr.read() # 错误结果 # subprocess模块拿到的是bytes类型,所以直接发送即可 cmd_res = stdout_res if stdout_res else stderr_res # 因为两个结果只有一个有信息,所以我们只拿到有结果的那个 msg_length = len(cmd_res) # 本次数据的长度 conn.send(str(msg_length).encode("utf-8")) # 先将要发的整体内容长度发送过去 if conn.recv(1024) == b"ready": # 如果接收方回应了ready则开始发送真正的数据体 conn.send(cmd_res) except Exception: break conn.close() # 由于client端链接异常,故关闭链接循环
Client端:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # ==== 基于TCP协议的socket实现远程命令输入之Client ==== from socket import * client = socket(AF_INET, SOCK_STREAM) client.connect(("xxx.xxx.xxx.xxx", 6666)) # 填入Server端公网IP while 1: cmd = input("请输入命令>>>:").strip() if not cmd: continue if cmd == "quit": break client.send(cmd.encode("utf-8")) msg_length = int(client.recv(1024).decode("utf-8")) # 接收到此次发送内容的整体长度 recv_length = 0 # 代表已接收的内容长度 cmd_res = b"" client.send(b"ready") # 发送给Server端,代表自己已经接收到此次内容长度,可以发送真正的数据啦 while recv_length < msg_length: cmd_res += client.recv(1024) # 本次接收1024字节数据,可能是一小节数据 recv_length += len(cmd_res) # 添加上本次读取的长度,当全部读取完后应该 recv_length == msg_length else: print(cmd_res.decode("utf-8")) # 如果Server端是Windows则用gbk解码,类Unix用utf-8解码 client.close()
结果如下:
解决方案2:json+struct方案
其实上面的解决方案还是有一些弊端,因为Server端是发送了2次send()
,第1次发送数据整体长度,第2次发送数据内容主体,这样其实是不太好的(Server端可能同时处理多个链接,所以send()
次数越少越好),而且如果Server端传的是一个文件的话那么局限性就太强了。因为我们只能将整体的消息长度发送过去而诸如文件名,文件大小之内的信息就发送不过去。
所以我们需要一个更加完美的解决方案,即Server端发送一次send()
就将本次的数据整体长度发送过去(还可以包括文件姓名,文件大小等信息。)
struct
模块使用介绍
struct
模块可以将其某一种数据格式序列化为固定长度的Bytes
类型,其中最重要的两个方法就是pack()
、unpack()
。
pack(fmt,*args)
: 根据格式将其转换为Bytes
类型
unpack(fmt,string)
:根据格式将Bytes
类型数据反解为其原本的形式
格式 | C语言类型 | Python类型 | 字节数大小 |
---|---|---|---|
x | 填充字节 | 没有值 | |
c | char | 字节长度为1 | 1 |
b | signed char | 整数 | 1 |
B | unsigned char | 整数 | 1 |
? | _Bool | bool | 1 |
h | short | 整数 | 2 |
H | unsigned short | 整数 | 2 |
i | int | 整数 | 4 |
I | unsigned int | 整数 | 4 |
l | long | 整数 | 4 |
L | unsigned long | 整数 | 4 |
q | long long | 整数 | 8 |
Q | unsigned long long | 整数 | 8 |
n | ssize_t | 整数 | |
N | size_t | 整数 | |
f | float | 浮点数 | 4 |
d | double | 浮点数 | 8 |
s | char[] | 字节 | |
p | char[] | 字节 | |
P | void * | 整数 |
使用演示:
>>> import struct >>> b1 = struct.pack("i",12) # 尝试将 int类型的12进行序列化,得到一个4字节的对象 >>> b1 b'x0cx00x00x00' >>> struct.unpack("i",b1) # 尝试将12的序列化对象字节进行反解,得出元组,第1位就是需要的数据。 (12,) >>>
好了,了解到这里我们就可以开始进行改写了。
Server端代码如下:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # ==== 基于TCP协议的socket实现远程命令输入之Server ==== import json import struct import subprocess from socket import * server = socket(AF_INET, SOCK_STREAM) server.bind(("0.0.0.0", 6666)) # 放在远程填入0.0.0.0 放在本地测试填入127.0.0.1 server.listen(5) while 1: # 链接循环 conn, client_addr = server.accept() while 1: # 通信循环 try: # 防止Windows平台下Client端异常关闭导致双向链接崩塌Server端异常的情况发生 cmd = conn.recv(1024) if not cmd: # 防止类Unix平台下Client端异常关闭导致双向链接崩塌Server端异常的情况发生 break res = subprocess.Popen(cmd.decode("utf-8"), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout_res = res.stdout.read() # 正确结果 stderr_res = res.stderr.read() # 错误结果 # subprocess模块拿到的是bytes类型,所以直接发送即可 cmd_res = stdout_res if stdout_res else stderr_res # 因为两个结果只有一个有信息,所以我们只拿到有结果的那个 # 解决粘包:构建字典,包含数据主体长度,这个就相当于其头部信息 head_msg = { "msg_length": len(cmd_res), # 包含数据主体部分的长度 # 如果是文件,还可以添加file_name,file_size等属性。 } # 序列化成json格式,并且统计其头部的长度 head_data = json.dumps(head_msg).encode("utf-8") head_length = struct.pack("i", len(head_data)) # 得到4字节的头部信息,里面包含头部的长度 # 发送头部长度信息,头部数据,与真实数据部分 conn.send(head_length + head_data + cmd_res) except Exception: break conn.close() # 由于client端链接异常,故关闭链接循环
Client端代码如下:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # ==== 基于TCP协议的socket实现远程命令输入之Client ==== import json import struct from socket import * client = socket(AF_INET, SOCK_STREAM) client.connect(("xxx.xxx.xxx.xxx", 6666)) # 填入Server端公网IP while 1: cmd = input("请输入命令>>>:").strip() if not cmd: continue if cmd == "quit": break client.send(cmd.encode("utf-8")) # 发送终端命令 # 解决粘包 head_length = struct.unpack("i", client.recv(4))[0] # 接收到头部的长度信息 head_data = json.loads(client.recv(head_length)) # 接收到真实的头部信息 msg_length = head_data["msg_length"] # 获取到数据主体的长度信息 recv_length = 0 # 代表已接收的内容长度 cmd_res = b"" # 开始获取真正的数据主体信息 while recv_length < msg_length: cmd_res += client.recv(1024) # 本次接收1024字节数据,可能是一小节数据 recv_length += len(cmd_res) # 添加上本次读取的长度,当全部读取完后应该 recv_length == msg_length else: print(cmd_res.decode("utf-8")) # 如果Server端是Windows则用gbk解码,类Unix用utf-8解码 client.close()
思想如下:
1.Server端构建自身的数据头部分,其中包含数据体整体长度,如果传输的是文件的话还可以包含文件名,文件大小等信息
2.将数据头部分
json
序列化后再转换为Bytes
类型3.使用
struct.pack()
模块获取数据头的长度,得到一个长度为4的Bytes
类型4.Server端将 数据头长度 + 数据头部分 + 数据体部分 全部发送给Client端
5. Client端
recv()
接收值改为4,拿到数据头长度Bytes类型6. Client端使用
struct.unpack(
数据头长度Bytes类型)
模块反解出数据头真实的长度7. Client端使用
recv()
接收值为数据头真实的长度拿到真正的数据头8. 通过
json
反序列化出真正的数据头,在到其中取出数据体的长度9. 开始
while
循环不断的读取真实的数据体数据
解决方案3:iter()与偏函数(失败案例)
上面那么做看似完美但还是美中不足。因为内存缓冲区本来就是只能取一次值,和迭代器很像,只能迭代一次便不能继续迭代了。基于这一点我们来做一个终极优化:
还记得iter()
方法吗?iter()
方法除开创建迭代器外实际上还有一个参数:
def iter(source, sentinel=None): # known special case of iter """ iter(iterable) -> iterator iter(callable, sentinel) -> iterator Get an iterator from an object. In the first form, the argument must supply its own iterator, or be a sequence. In the second form, the callable is called until it returns the sentinel. """ pass
我们来试试这个参数做什么用的。
li = [1, 2, 3, 4] def my_iter(): return li.pop() res = iter(my_iter, 2) # 代表这个迭代器没__next__一下就会执行my_iter函数,并且该函数返回值如果是2则终止迭代 print(res.__next__()) # 4 print(res.__next__()) # 3 print(res.__next__()) # StopIteration
第二个参数看来可以设置迭代的终点。
那么偏函数是什么呢?偏函数可以设定一个固定的参数给第一个位置的值
效果如下:
from functools import partial # 导入偏函数 def add(x, y): return x + y func = partial(add, 1) # 设置辨寒暑绑定的第一个参数的值 print(func(1)) # 2 print(func(5)) # 6
现在我们仔细回想,当缓冲区的消息接收完毕后为空的状态是会变成 b""
的形式。那么这个时候我们可以使用iter()
方法设置为不断的取出缓存中的值直到出现b""
,而偏函数可以对recv()
函数进行设置让它始终取一个值,最后通过join来拼接出取出的所有值即可。
可以使用 "".join(iter(partial(tcp_clien.recv,back_log)),b"")
我们尝试用函数来查看一下效果:
from functools import partial # 导入偏函数 li = [b"","1","2","3","4","5"] # 模拟内核缓冲区 def test(buffer_size): if buffer_size: # 模拟recv的数据大小 return li.pop() print("buffer_size必须为一个int类型的值") res = "".join(iter(partial(test,1024),b"")) print(res) # 54321 # join()方法会不断的调用iter()下的__next__,每调用一次就执行一次偏函数。知道出现b""停止
最后我们发现,这样的做法是会产生recv()
阻塞的,总体来说还是不能够成功。因为join()
方法会不断的执行,即使内核缓冲区的数据被recv()
读完了也不会终止迭代而是继续阻塞下次的recv()
,故这种方式宣告失败。(还是iter()
的第二个参数导致的,或许读取完后内核缓冲区中的数据并不是b""
)
测试的Server端代码如下:
from socket import * import subprocess import struct ip_port=('127.0.0.1',8080) back_log=5 buffer_size=1024 tcp_server=socket(AF_INET,SOCK_STREAM) tcp_server.bind(ip_port) tcp_server.listen(back_log) while True: conn,addr=tcp_server.accept() print('新的Client链接',addr) while True: #收 try: cmd=conn.recv(buffer_size) if not cmd:break print('收到Client的命令',cmd) #执行命令,得到命令的运行结果cmd_res res=subprocess.Popen(cmd.decode('utf-8'),shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, stdin=subprocess.PIPE) err=res.stderr.read() if err: cmd_res=err else: cmd_res=res.stdout.read() #发 if not cmd_res: cmd_res='执行成功'.encode('gbk') length=len(cmd_res) data_length=struct.pack('i',length) conn.send(data_length) conn.send(cmd_res) except Exception as e: print(e) break
测试的Client代码如下:
from socket import * import struct from functools import partial #偏函数 ip_port=('127.0.0.1',8080) back_log=5 buffer_size=1024 tcp_client=socket(AF_INET,SOCK_STREAM) tcp_client.connect(ip_port) while True: cmd=input('>>: ').strip() if not cmd:continue if cmd == 'quit':break tcp_client.send(cmd.encode('utf-8')) #解决粘包 length_data=tcp_client.recv(4) length=struct.unpack('i',length_data)[0] #第一种方法 recv_size=0 recv_msg=b'' while recv_size < length: #为何recv里是buffer_size,不是length,因为length如果为24G,系统内存没有那么大 #所以每次buffer_size,当recv_size < length时,循环接收,直到recv_size =length,退出循环 recv_msg += tcp_client.recv(buffer_size) recv_size=len(recv_msg) #1024 #第二种方法 失败版本,会引发recv()的阻塞,而不会终止迭代。因为join()方法会不断的调用其iter()方法产生的迭代器,也就是调用其__next__方法,所以第二次没消息的recv()会阻塞住。 #recv_msg=''.join(iter(partial(tcp_client.recv, buffer_size), b'')) print('命令的执行结果是 ',recv_msg.decode('gbk')) tcp_client.close()
UDP协议为何不会产生粘包
UDP协议是面向消息的协议,每一次的
sendto()
与recvfrom()
必须一一对应,否则就会收不到消息。
UDP是面向消息的协议,每个UDP段都是一条消息,每sendto()
一次就是发送一次消息,而不管接收方有没有收到消息发送方只管自己的发送任务,这也是UDP被称为不可靠传输协议的由来。接收端的套接字缓冲区采用了链式的结构来记录每一个到达的UDP包,在每一个UDP包中都有了消息头,包括端口,消息源等等..于是UDP就能够去区分出一个明确的消息定义,即面向消息的通信是有消息边界的,所以UDP的传输叫做数据报的形式。
并且每一次recvform()
的buffer_size
最大值如果不够获取完全部的内核缓冲区里的数据的话,那么只会收够指定的最大字节数量(即buffer_size
的设定值),剩余的就不要了。所以UDP不会存在粘包,多么干脆利落...
我们还是用一个快递员的那个图来进行演示:
还有一点需要注意一下。使用UDP协议进行通信的时候不管首先启动哪一方都不会报错,因为它只管发,不管有没有人接收。
所以,这也是我称UDP协议比较随便的原因。