一、socket(单链接)
1、socket:应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。在设计模式中,Socket其实就是一个门面模式,它把复杂的TCP/IP协议族隐藏在Socket接口后面;也有人将socket说成ip+port,ip是用来标识互联网中的一台主机的位置,而port是用来标识这台机器上的一个应用程序,ip地址是配置到网卡上的,而port是应用程序开启的,ip与port的绑定就标识了互联网中独一无二的一个应用程序;而程序的pid是同一台机器上不同进程或者线程的标识。
2、套接字:用于在同一台主机上多个应用程序之间的通讯。套接字有两种(或者称为有两个种族),分别是基于文件型(AF_UNIX)和基于网络型(AF_INET)。
3、基于TCP的套接字(类型一)
工作原理:先从服务器端说起。服务器端先初始化Socket,然后与端口绑定(bind),对端口进行监听(listen),调用accept阻塞,等待客户端连接。在这时如果有个客户端初始化一个Socket,然后连接服务器(connect),如果连接成功,这时客户端与服务器端的连接就建立了。客户端发送数据请求,服务器端接收请求并处理请求,然后把回应数据发送给客户端,客户端读取数据,最后关闭连接,一次交互结束
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 import socket #导入socket模块 5 ip_port = ("127.0.0.1",9999) #设置服务器ip和端口 6 server = socket.socket() #创建server实例 //声明socket类型同时生成socket对象 7 server.bind(ip_port) #套接字绑定ip与端口 8 server.listen(5) #监听连接//允许5个客户端排队 9 conn,addr = server.accept() #等待客户端连接 // 客户端连接后,返回新的套接字与IP地址 10 client_data = conn.recv(1024) #接收数据//把接收的数据实例化 11 #client_data = b'hello' 12 conn.sendall(client_data.upper()) #把数据发送到客户端 //upper() 字母变成大写 13 conn.close() #关闭连接
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 import socket #导入socket模块 5 ip_port = ("127.0.0.1",9999) #设置服务器ip和端口 6 client = socket.socket() #创建client实例 7 client.connect(ip_port) #设置要连接的ip和端口 8 info = "hello world" #要发送的数据 9 client.sendall(info.encode("utf-8")) #发送数据// 把str转换为bytes类型 10 server_data = client.recv(1024) #接收数据 11 client.close() #关闭连接
1 ① server = socket.socket() 2 套接字格式:socket(family,type[,protocal]) 使用给定的地址族、套接字类型、协议编号(默认为0)来创建套接字。 3 参数一:地址簇 4 socket.AF_INET IPv4(默认) 5 socket.AF_INET6 IPv6 6 socket.AF_UNIX 只能够用于单一的Unix系统进程间通信 7 参数二:类型 8 socket.SOCK_STREAM 流式socket , for TCP (默认) 9 socket.SOCK_DGRAM 数据报式socket , for UDP 10 socket.SOCK_RAW 原始套接字,普通的套接字无法处理ICMP、IGMP等网络报文,而SOCK_RAW可以;其次,SOCK_RAW也可以处理特殊的IPv4报文;此外,利用原始套接字,可以通过IP_HDRINCL套接字选项由用户构造IP头。 11 socket.SOCK_RDM 是一种可靠的UDP形式,即保证交付数据报但不保证顺序。SOCK_RAM用来提供对原始协议的低级访问,在需要执行某些特殊操作时使用,如发送ICMP报文。SOCK_RAM通常仅限于高级用户或管理员运行的程序使用。 12 socket.SOCK_SEQPACKET 可靠的连续数据包服务 13 参数三:协议 14 0 (默认)与特定的地址家族相关的协议,如果是 0 ,则系统就会根据地址格式和套接类别,自动选择一个合适的协议 15 #创建TCP Socket:server=socket.socket(socket.AF_INET,socket.SOCK_STREAM) 16 #创建UDP Socket:server=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) 17 注意点: 18 1)TCP发送数据时,已建立好TCP连接,所以不需要指定地址。UDP是面向无连接的,每次发送要指定是发给谁。 19 2)服务端与客户端不能直接发送列表,元组,字典。需要字符串化repr(data) 20 ② server.bind(address) 21 将套接字绑定到地址。address地址的格式取决于地址族。在AF_INET下,以元组(host,port)的形式表示地址 22 ③ server.listen(backlog) 23 开始监听传入连接。backlog指定在拒绝连接之前,可以挂起的最大连接数量。该值至少为1,大部分应用程序设为5就可以了。backlog等于5,表示内核已经接到了连接请求,但服务器还没有调用accept进行处理的连接个数最大为5,这个值不能无限大,因为要在内核中维护连接队列 24 ④ server.setblocking(bool) 25 是否阻塞(默认True),如果设置False,那么accept和recv时一旦无数据,则报错 26 ⑤ conn,addr = server.accept() 27 接受连接并返回(conn,address),其中conn是新的套接字对象,可以用来接收和发送数据。address是连接客户端的地址。接收TCP 客户的连接(阻塞式)等待连接的到来 28 ⑥ client.connect(address) 29 连接到address处的套接字。一般address的格式为元组(hostname,port),如果连接出错,返回socket.error错误。 30 ⑦ client.connect_ex(address) 31 同上,只不过会有返回值,连接成功时返回 0 ,连接失败时候返回编码,例如:10061 32 ⑧ client.close() 33 关闭套接字 34 ⑨ client.recv(bufsize[,flag]) 35 接受套接字的数据。数据以字符串形式返回,bufsize指定最多可以接收的数量。flag提供有关消息的其他信息,通常可以忽略 36 ⑩ client.recvfrom(bufsize[.flag]) 37 与recv()类似,但返回值是(data,address)。其中data是包含接收数据的字符串,address是发送数据的套接字地址 38 ⑪ server.send(string[,flag]) 39 发送TCP数据;将string中的数据发送到连接的套接字。返回值是要发送的字节数量,该数量可能小于string的字节大小。即:可能未将指定内容全部发送 40 ⑫ server.sendall(string[,flag]) 41 完整发送TCP数据;将string中的数据发送到连接的套接字,但在返回之前会尝试发送所有数据。成功返回None,失败则抛出异常;内部通过递归调用send,将所有内容发送出去 42 ⑬ server.sendto(string[,flag],address) 43 将数据发送到套接字,address是形式为(ipaddr,port)的元组,指定远程地址。返回值是发送的字节数。该函数主要用于UDP协议 44 ⑭ sk.settimeout(timeout) 45 设置套接字操作的超时期,timeout是一个浮点数,单位是秒。值为None表示没有超时期。一般,超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如 client 连接最多等待5s ) 46 ⑮ sk.getpeername() 47 返回连接套接字的远程地址。返回值通常是元组(ipaddr,port) 48 ⑯ sk.getsockname() 49 返回套接字自己的地址。通常是一个元组(ipaddr,port) 50 ⑰ sk.fileno() 51 套接字的文件描述符 52 53 54 ###服务端套接字函数### 55 s.bind() #绑定(主机,端口号)到套接字 56 s.listen() #开始TCP监听 57 s.accept() #被动接受TCP客户的连接,(阻塞式)等待连接的到来 58 59 ###客户端套接字函数### 60 s.connect() #主动初始化TCP服务器连接 61 s.connect_ex() #connect()函数的扩展版本,出错时返回出错码,而不是抛出异常 62 63 ###公共用途的套接字函数### 64 s.recv() #接收TCP数据 65 s.send() #发送TCP数据(send在待发送数据量大于己端缓存区剩余空间时,数据丢失,不会发完) 66 s.sendall() #发送完整的TCP数据(本质就是循环调用send,sendall在待发送数据量大于己端缓存区剩余空间时,数据不丢失,循环调用send直到发完) 67 s.recvfrom() #接收UDP数据 68 s.sendto() #发送UDP数据 69 s.getpeername() #连接到当前套接字的远端的地址 70 s.getsockname() #当前套接字的地址 71 s.getsockopt() #返回指定套接字的参数 72 s.setsockopt() #设置指定套接字的参数 73 s.close() #关闭套接字 74 75 ###面向锁的套接字方法### 76 s.setblocking() #设置套接字的阻塞与非阻塞模式 77 s.settimeout() #设置阻塞套接字操作的超时时间 78 s.gettimeout() #得到阻塞套接字操作的超时时间 79 80 ###面向文件的套接字的函数### 81 s.fileno() #套接字的文件描述符 82 s.makefile() #创建一个与该套接字相关的文件
4、基于UDP的套接字(类型二)
udp是无链接的,先启动哪一端都不会报错且可以同时多个客户端去跟服务端通信
1 #UDP server 2 ss = socket() #创建一个服务器的套接字 3 ss.bind() #绑定服务器套接字 4 inf_loop: #服务器无限循环 5 cs = ss.recvfrom()/ss.sendto() # 对话(接收与发送) 6 ss.close() # 关闭服务器套接字 7 8 9 #UDP client 10 cs = socket() # 创建客户套接字 11 comm_loop: # 通讯循环 12 cs.sendto()/cs.recvfrom() # 对话(发送/接收) 13 cs.close() # 关闭客户套接字
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 import socket 5 ip_port=('127.0.0.1',9000) 6 BUFSIZE=1024 7 udp_server_client=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) 8 udp_server_client.bind(ip_port) 9 10 while True: 11 msg,addr=udp_server_client.recvfrom(BUFSIZE) 12 print(msg,addr) 13 udp_server_client.sendto(msg.upper(),addr)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 import socket 5 ip_port=('127.0.0.1',9000) 6 BUFSIZE=1024 7 udp_server_client=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) 8 9 while True: 10 msg=input('>>: ').strip() 11 if not msg:continue 12 udp_server_client.sendto(msg.encode('utf-8'),ip_port) 13 back_msg,addr=udp_server_client.recvfrom(BUFSIZE) 14 print(back_msg.decode('utf-8'),addr) 15 udp_client_socket.close()
##qq聊天(由于udp无连接,所以可以同时多个客户端去跟服务端通信)##
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 import socket 5 ip_port=('127.0.0.1',8081) 6 udp_server_sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) #买手机 7 udp_server_sock.bind(ip_port) 8 9 while True: 10 qq_msg,addr=udp_server_sock.recvfrom(1024) 11 print('来自[%s:%s]的一条消息: 33[1;44m%s 33[0m' %(addr[0],addr[1],qq_msg.decode('utf-8'))) 12 back_msg=input('回复消息: ').strip() 13 14 udp_server_sock.sendto(back_msg.encode('utf-8'),addr)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 import socket 5 BUFSIZE=1024 6 udp_client_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) 7 8 qq_name_dic={ 9 '狗哥alex':('127.0.0.1',8081), 10 '瞎驴':('127.0.0.1',8081), 11 '一棵树':('127.0.0.1',8081), 12 '武大郎':('127.0.0.1',8081), 13 } 14 15 16 while True: 17 qq_name=input('请选择聊天对象: ').strip() 18 while True: 19 msg=input('请输入消息,回车发送: ').strip() 20 if msg == 'quit':break 21 if not msg or not qq_name or qq_name not in qq_name_dic:continue 22 udp_client_socket.sendto(msg.encode('utf-8'),qq_name_dic[qq_name]) 23 24 back_msg,addr=udp_client_socket.recvfrom(BUFSIZE) 25 print('来自[%s:%s]的一条消息: 33[1;44m%s 33[0m' %(addr[0],addr[1],back_msg.decode('utf-8'))) 26 27 udp_client_socket.close()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 import socket 5 BUFSIZE=1024 6 udp_client_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) 7 8 qq_name_dic={ 9 '狗哥alex':('127.0.0.1',8081), 10 '瞎驴':('127.0.0.1',8081), 11 '一棵树':('127.0.0.1',8081), 12 '武大郎':('127.0.0.1',8081), 13 } 14 15 16 while True: 17 qq_name=input('请选择聊天对象: ').strip() 18 while True: 19 msg=input('请输入消息,回车发送: ').strip() 20 if msg == 'quit':break 21 if not msg or not qq_name or qq_name not in qq_name_dic:continue 22 udp_client_socket.sendto(msg.encode('utf-8'),qq_name_dic[qq_name]) 23 24 back_msg,addr=udp_client_socket.recvfrom(BUFSIZE) 25 print('来自[%s:%s]的一条消息: 33[1;44m%s 33[0m' %(addr[0],addr[1],back_msg.decode('utf-8'))) 26 27 udp_client_socket.close()
5、粘包现象:①发送端需要等缓冲区满才发送出去,造成粘包(发送数据时间间隔很短,数据了很小,TCP有一个Nagle算法会把数据合到一起,产生粘包)
②接收方不及时接收缓冲区的包,造成多个包接收(客户端发送了一段数据,服务端只收了一小部分,服务端下次再收的时候还是从缓冲区拿上次遗留的数据,产生粘包)
原因:接收方不知道消息之间的界限,不知道一次性提取多少字节的数据。
TCP有粘包现象,UDP永远不会粘包:tcp是基于数据流的,收发两端都要有一一成对的socket,TCP采用Nagle优化算法消息进行消息处理机制,面向流的通信是无消息保护边界的。而udp是基于数据报的,支持的是一对多的模式,套接字缓冲区采用了链式结构来记录每一个到达的UDP包,在每个UDP包中添加了消息头(消息来源地址,端口等信息),面向消息的通信是有消息保护边界的。
tcp是可靠传输,udp是不可靠传输:tcp在数据传输时,发送端先把数据发送到自己的缓存中,然后协议控制将缓存中的数据发往对应端,对应端返回一个ack=1,发送端则清理缓存中的数据,对端返回ack=0,则重新发送数据,所以tcp是可靠的;而udp发送数据,对端是不会返回确认信息的,因此不可靠。
解决粘包:问题的根源在于,接收端不知道发送端将要传送的字节流的长度,所以解决粘包的方法就是围绕,如何让发送端在发送数据前,把自己将要发送的字节流总大小让接收端知晓,然后接收端来一个死循环接收完所有数据。
第一种解决方案:(low)
1 #server端 2 3 #!/usr/bin/env python 4 # -*- coding:utf-8 -*- 5 # _author_soloLi 6 from socket import * #由于 socket 模块中有太多的属性。我们在这里破例使用了'from module import *'语句。使用 'from socket import *',我们就把 socket模块里的所有属性都带到我们的命名空间里了,这样能 大幅减短我们的代码。 7 import subprocess 8 ip_port=('127.0.0.1',8080) 9 back_log=5 10 buffer_size=1024 11 12 server=socket(AF_INET,SOCK_STREAM) 13 server.bind(ip_port) 14 server.listen(back_log) 15 16 while True: #链接循环 17 conn,addr=server.accept() 18 print('新的客户端链接',addr) 19 while True: #通信循环 20 21 ##//收数据//## 22 try: 23 cmd=conn.recv(buffer_size) 24 if not cmd:break 25 print('收到客户端的命令',cmd) 26 27 #执行命令,得到命令的运行结果cmd_res 28 29 #subprocess模块提供了一种一致的方法来创建和处理附加进程,与标准库中的其它模块相比,提供了一个更高级的接口。用于替换如下模块:os.system() , os.spawnv() , os和popen2模块中的popen()函数,以及 commands(). 30 res=subprocess.Popen(cmd.decode('utf-8'),shell=True, #解码(bytes->str) 31 stderr=subprocess.PIPE, 32 stdout=subprocess.PIPE, 33 stdin=subprocess.PIPE) 34 35 err=res.stderr.read() 36 if err: 37 cmd_res=err 38 else: 39 cmd_res=res.stdout.read() ##编码是以当前所在的系统为准的,如果是windows,那么res.stdout.read()读出的就是GBK编码的,在接收端需要用GBK解码且只能从管道里读一次结果 40 41 ##//发数据//## 42 if not cmd_res: 43 cmd_res='执行成功'.encode('gbk') #转换编码(gbk->unicode)(转换显示中文的字符串) 44 45 length=len(cmd_res) 46 conn.send(str(length).encode('utf-8')) #编码(str->bytes) 47 client_ready=conn.recv(buffer_size) 48 if client_ready == b'ready': 49 conn.send(cmd_res) 50 except Exception as e: 51 print(e) 52 break
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 from socket import * 5 ip_port=('127.0.0.1',8080) 6 back_log=5 7 buffer_size=1024 8 9 client=socket(AF_INET,SOCK_STREAM) 10 client.connect(ip_port) 11 12 while True: 13 cmd=input('>>: ').strip() 14 if not cmd:continue 15 if cmd == 'quit':break 16 17 client.send(cmd.encode('utf-8')) #编码(str->bytes) 18 19 #解决粘包 20 length=client.recv(buffer_size) 21 client.send(b'ready') 22 23 length=int(length.decode('utf-8')) #解码(bytes->str) 24 25 recv_size=0 26 recv_msg=b'' 27 while recv_size < length: 28 recv_msg += tcp_client.recv(buffer_size) 29 recv_size=len(recv_msg) #1024 30 31 print('命令的执行结果是 ',recv_msg.decode('gbk')) #转换编码(gbk->unicode)(转换显示中文的字符串) 32 client.close()
low的原因:程序的运行速度远快于网络传输速度,所以在发送一段字节前,先用send去发送该字节流长度,这种方式会放大网络延迟带来的性能损耗
第二种解决方案:(NB)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 from socket import * #由于 socket 模块中有太多的属性。我们在这里破例使用了'from module import *'语句。使用 'from socket import *',我们就把 socket模块里的所有属性都带到我们的命名空间里了,这样能 大幅减短我们的代码。 5 import subprocess 6 ip_port=('127.0.0.1',8080) 7 back_log=5 8 buffer_size=1024 9 10 server=socket(AF_INET,SOCK_STREAM) 11 server.bind(ip_port) 12 server.listen(back_log) 13 14 while True: #链接循环 15 conn,addr=server.accept() 16 print('新的客户端链接',addr) 17 while True: #通信循环 18 19 ##//收数据//## 20 try: 21 cmd=conn.recv(buffer_size) 22 if not cmd:break 23 print('收到客户端的命令',cmd) 24 25 #执行命令,得到命令的运行结果cmd_res 26 27 #subprocess模块提供了一种一致的方法来创建和处理附加进程,与标准库中的其它模块相比,提供了一个更高级的接口。用于替换如下模块:os.system() , os.spawnv() , os和popen2模块中的popen()函数,以及 commands(). 28 res=subprocess.Popen(cmd.decode('utf-8'),shell=True, #解码(bytes->str) 29 stderr=subprocess.PIPE, 30 stdout=subprocess.PIPE, 31 stdin=subprocess.PIPE) 32 33 err=res.stderr.read() 34 if err: 35 cmd_res=err 36 else: 37 cmd_res=res.stdout.read() ##编码是以当前所在的系统为准的,如果是windows,那么res.stdout.read()读出的就是GBK编码的,在接收端需要用GBK解码且只能从管道里读一次结果 38 39 ##//发数据//## 40 if not cmd_res: 41 cmd_res='执行成功'.encode('gbk') #转换编码(gbk->unicode)(转换显示中文的字符串) 42 43 length=len(cmd_res) 44 conn.send(str(length).encode('utf-8')) #编码(str->bytes) 45 client_ready=conn.recv(buffer_size) 46 if client_ready == b'ready': 47 conn.send(cmd_res) 48 except Exception as e: 49 print(e) 50 break
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 from socket import * 5 import struct ##NB##//struct模块可以把一个类型,如数字,转成固定长度的bytes 6 from functools import partial ##NB## 7 ip_port=('127.0.0.1',8080) 8 back_log=5 9 buffer_size=1024 10 11 client=socket(AF_INET,SOCK_STREAM) 12 client.connect(ip_port) 13 14 while True: 15 cmd=input('>>: ').strip() 16 if not cmd:continue 17 if cmd == 'quit':break 18 19 client.send(cmd.encode('utf-8')) #编码(str->bytes) 20 21 #解决粘包 22 length_data=tcp_client.recv(4)##NB##为字节流加上自定义固定长度报头,报头中包含字节流长度, 23 #然后一次send到对端,对端在接收时,先从缓存中取出定长的报头,然后再取真实数据 24 length=struct.unpack('i',length_data)[0] ##NB## 25 length=int(length.decode('utf-8')) #解码(bytes->str) 26 27 recv_size=0 28 recv_msg=b'' 29 while recv_size < length: 30 recv_msg += tcp_client.recv(buffer_size) 31 recv_size=len(recv_msg) #1024 32 33 print('命令的执行结果是 ',recv_msg.decode('gbk')) #转换编码(gbk->unicode)(转换显示中文的字符串) 34 client.close()
第二种方案的引申:(NBST)
1 import json,struct 2 #假设通过客户端上传1T:1073741824000的文件a.txt 3 4 #为避免粘包,必须自定制报头 5 header={'file_size':1073741824000,'file_name':'/a/b/c/d/e/a.txt','md5':'8f6fbf8347faa4924a76856701edb0f3'} #1T数据,文件路径和md5值 6 7 #为了该报头能传送,需要序列化并且转为bytes 8 head_bytes=bytes(json.dumps(header),encoding='utf-8') #序列化并转成bytes,用于传输 9 10 #为了让客户端知道报头的长度,用struck将报头长度这个数字转成固定长度:4个字节 11 head_len_bytes=struct.pack('i',len(head_bytes)) #这4个字节里只包含了一个数字,该数字是报头的长度 12 13 #客户端开始发送 14 conn.send(head_len_bytes) #先发报头的长度,4个bytes 15 conn.send(head_bytes) #再发报头的字节格式 16 conn.sendall(文件内容) #然后发真实内容的字节格式 17 18 #服务端开始接收 19 head_len_bytes=s.recv(4) #先收报头4个bytes,得到报头长度的字节格式 20 x=struct.unpack('i',head_len_bytes)[0] #提取报头的长度 21 22 head_bytes=s.recv(x) #按照报头长度x,收取报头的bytes格式 23 header=json.loads(json.dumps(header)) #提取报头 24 25 #最后根据报头的内容提取真实的数据,比如 26 real_data_len=s.recv(header['file_size']) 27 s.recv(real_data_len)
1 #我们可以把报头做成字典,字典里包含将要发送的真实数据的详细信息,然后json序列化,然后用struck将序列化后的数据长度打包成4个字节(4个自己足够用了) 2 #发送时: 3 1.先发报头长度 4 2.再编码报头内容然后发送 5 3.最后发真实内容 6 7 #接收时: 8 1.先手报头长度,用struct取出来 9 2.根据取出的长度收取报头内容,然后解码,反序列化 10 3.从反序列化的结果中取出待取数据的详细信息,然后去取真实的数据内容
1 import socket,struct,json 2 import subprocess 3 phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) 4 phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #就是它,在bind前加 5 6 phone.bind(('127.0.0.1',8080)) 7 8 phone.listen(5) 9 10 while True: 11 conn,addr=phone.accept() 12 while True: 13 cmd=conn.recv(1024) 14 if not cmd:break 15 print('cmd: %s' %cmd) 16 17 res=subprocess.Popen(cmd.decode('utf-8'), 18 shell=True, 19 stdout=subprocess.PIPE, 20 stderr=subprocess.PIPE) 21 err=res.stderr.read() 22 print(err) 23 if err: 24 back_msg=err 25 else: 26 back_msg=res.stdout.read() 27 28 headers={'data_size':len(back_msg)} 29 head_json=json.dumps(headers) 30 head_json_bytes=bytes(head_json,encoding='utf-8') 31 32 conn.send(struct.pack('i',len(head_json_bytes))) #先发报头的长度 33 conn.send(head_json_bytes) #再发报头 34 conn.sendall(back_msg) #在发真实的内容 35 36 conn.close() 37 38 服务端:定制稍微复杂一点的报头
1 from socket import * 2 import struct,json 3 4 ip_port=('127.0.0.1',8080) 5 client=socket(AF_INET,SOCK_STREAM) 6 client.connect(ip_port) 7 8 while True: 9 cmd=input('>>: ') 10 if not cmd:continue 11 client.send(bytes(cmd,encoding='utf-8')) 12 13 head=client.recv(4) 14 head_json_len=struct.unpack('i',head)[0] 15 head_json=json.loads(client.recv(head_json_len).decode('utf-8')) 16 data_len=head_json['data_size'] 17 18 recv_size=0 19 recv_data=b'' 20 while recv_size < data_len: 21 recv_data+=client.recv(1024) 22 recv_size+=len(recv_data) 23 24 print(recv_data.decode('utf-8')) 25 #print(recv_data.decode('gbk')) #windows默认gbk编码 26 27 客户端
二、socketserver(多链接)
基于tcp的套接字,关键就是两个循环,一个链接循环,一个通信循环
socket:收到一个链接后直接进入通信循环,其他链接要等之前的那个链接通信循环结束后才能进入通信循环
socketserver:模块中分两大类:server类(解决链接问题)和request类(解决通信问题)
socketserver模块是标准库中很多服务器框架的基础。内部使用 IO多路复用 以及 “多线程” 和 “多进程” ,从而实现并发处理多个客户端请求的Socket服务端。即:每个客户端请求连接到服务器时,Socket服务端都会在服务器是创建一个“线程”或者“进程” 专门负责处理当前客户端的所有请求。
ThreadingTCPServer(多线程,真并发)实现的Soket服务器内部会为每个client创建一个 “线程”,该线程用来和客户端进行交互。
使用ThreadingTCPServer:
1、创建一个继承自 SocketServer.BaseRequestHandler 的类
2、类中必须定义一个名称为 handle 的方法
3、启动ThreadingTCPServer
1 from socket import * 2 import subprocess 3 import struct 4 ip_port=('127.0.0.1',8080) 5 back_log=5 6 buffer_size=1024 7 8 tcp_server=socket(AF_INET,SOCK_STREAM) 9 tcp_server.bind(ip_port) 10 tcp_server.listen(back_log) 11 12 while True: 13 conn,addr=tcp_server.accept() 14 print('新的客户端链接',addr) 15 while True: 16 #收 17 try: 18 cmd=conn.recv(buffer_size) 19 if not cmd:break 20 print('收到客户端的命令',cmd) 21 22 #执行命令,得到命令的运行结果cmd_res 23 res=subprocess.Popen(cmd.decode('utf-8'),shell=True, 24 stderr=subprocess.PIPE, 25 stdout=subprocess.PIPE, 26 stdin=subprocess.PIPE) 27 err=res.stderr.read() 28 if err: 29 cmd_res=err 30 else: 31 cmd_res=res.stdout.read() 32 33 #发 34 if not cmd_res: 35 cmd_res='执行成功'.encode('gbk') 36 37 length=len(cmd_res) 38 39 data_length=struct.pack('i',length) 40 conn.send(data_length) 41 conn.send(cmd_res) 42 except Exception as e: 43 print(e) 44 break
1 from socket import * 2 import struct 3 from functools import partial 4 ip_port=('127.0.0.1',8080) 5 back_log=5 6 buffer_size=1024 7 8 tcp_client=socket(AF_INET,SOCK_STREAM) 9 tcp_client.connect(ip_port) 10 11 while True: 12 cmd=input('>>: ').strip() 13 if not cmd:continue 14 if cmd == 'quit':break 15 16 tcp_client.send(cmd.encode('utf-8')) 17 18 19 #解决粘包 20 length_data=tcp_client.recv(4) 21 length=struct.unpack('i',length_data)[0] 22 23 recv_size=0 24 recv_data=b'' 25 while recv_size < length: 26 recv_data+=tcp_client.recv(buffer_size) 27 recv_size=len(recv_data) 28 print('命令的执行结果是 ',recv_data.decode('gbk')) 29 tcp_client.close()
1 from socket import * 2 import struct 3 from functools import partial 4 ip_port=('127.0.0.1',8080) 5 back_log=5 6 buffer_size=1024 7 8 tcp_client=socket(AF_INET,SOCK_STREAM) 9 tcp_client.connect(ip_port) 10 11 while True: 12 cmd=input('>>: ').strip() 13 if not cmd:continue 14 if cmd == 'quit':break 15 16 tcp_client.send(cmd.encode('utf-8')) 17 18 19 #解决粘包 20 length_data=tcp_client.recv(4) 21 length=struct.unpack('i',length_data)[0] 22 23 recv_size=0 24 recv_data=b'' 25 while recv_size < length: 26 recv_data+=tcp_client.recv(buffer_size) 27 recv_size=len(recv_data) 28 print('命令的执行结果是 ',recv_data.decode('gbk')) 29 tcp_client.close()
1 import socketserver 2 3 4 ''' 5 def __init__(self, request, client_address, server): 6 self.request = request 7 self.client_address = client_address 8 self.server = server 9 self.setup() 10 try: 11 self.handle() 12 finally: 13 self.finish() 14 15 ''' 16 17 class MyServer(socketserver.BaseRequestHandler): 18 19 def handle(self): 20 print('conn is: ',self.request) #conn 21 print('addr is: ',self.client_address) #addr 22 23 while True: 24 try: 25 #收消息 26 data=self.request.recv(1024) 27 if not data:break 28 print('收到客户端的消息是',data,self.client_address) 29 30 #发消息 31 self.request.sendall(data.upper()) 32 33 except Exception as e: 34 print(e) 35 break 36 37 if __name__ == '__main__': 38 s=socketserver.ThreadingTCPServer(('127.0.0.1',8080),MyServer) #多线程 39 # s=socketserver.ForkingTCPServer(('127.0.0.1',8080),MyServer) #多进程 40 41 # self.server_address = server_address 42 # self.RequestHandlerClass = RequestHandlerClass 43 print(s.server_address) 44 print(s.RequestHandlerClass) 45 print(MyServer) 46 print(s.socket) 47 print(s.server_address) 48 s.serve_forever()
1 from socket import * 2 ip_port=('192.168.12.63',8080) 3 back_log=5 4 buffer_size=1024 5 6 tcp_client=socket(AF_INET,SOCK_STREAM) 7 tcp_client.connect(ip_port) 8 9 while True: 10 msg=input('>>: ').strip() 11 if not msg:continue 12 if msg == 'quit':break 13 14 tcp_client.send(msg.encode('utf-8')) 15 16 data=tcp_client.recv(buffer_size) 17 print('收到服务端发来的消息:',data.decode('utf-8')) 18 19 tcp_client.close()
1 from socket import * 2 import struct 3 from functools import partial 4 ip_port=('192.168.12.63',8080) 5 back_log=5 6 buffer_size=1024 7 8 tcp_client=socket(AF_INET,SOCK_STREAM) 9 tcp_client.connect(ip_port) 10 11 while True: 12 msg=input('>>: ').strip() 13 if not msg:continue 14 if msg == 'quit':break 15 16 tcp_client.send(msg.encode('utf-8')) 17 18 data=tcp_client.recv(buffer_size) 19 print('收到服务端发来的消息:',data.decode('utf-8')) 20 21 22 23 tcp_client.close()
源码剖析:
server类:
request类:
继承关系:
基于TCP的socketserver源码:
ftpserver=socketserver.ThreadingTCPServer(('127.0.0.1',8080),FtpServer)
ftpserver.serve_forever()
查找属性的顺序:ThreadingTCPServer->ThreadingMixIn->TCPServer->BaseServer
1、实例化得到ftpserver,先找类ThreadingTCPServer的__init__,在TCPServer中找到,进而执行server_bind,server_active
2、找ftpserver下的serve_forever,在BaseServer中找到,进而执行self._handle_request_noblock(),该方法同样是在BaseServer中
3、执行self._handle_request_noblock()进而执行request, client_address = self.get_request()(就是TCPServer中的self.socket.accept()),然后执行self.process_request(request, client_address)
4、在ThreadingMixIn中找到process_request,开启多线程应对并发,进而执行process_request_thread,执行self.finish_request(request, client_address)
5、上述四部分完成了链接循环,本部分开始进入处理通讯部分,在BaseServer中找到finish_request,触发我们自己定义的类的实例化,去找__init__方法,而我们自己定义的类没有该方法,则去它的父类也就是BaseRequestHandler中找....
源码分析总结:
基于tcp的socketserver我们自己定义的类中的
1、self.server即套接字对象
2、self.request即一个链接
3、self.client_address即客户端地址
基于udp的socketserver我们自己定义的类中的
self.request是一个元组(第一个元素是客户端发来的数据,第二部分是服务端的udp套接字对象),如(b'adsf', <socket.socket fd=200, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('127.0.0.1', 8080)>)
self.client_address即客户端地址
SocketServer的ThreadingTCPServer之所以可以同时处理请求得益于 select 和 Threading 两个东西,其实本质上就是在服务器端为每一个客户端创建一个线程,当前线程用来处理对应客户端的请求,所以,可以支持同时n个客户端链接(长连接)。
三、线程
线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务
1、threading模块:建立在thread 模块之上。thread模块以低级、原始的方式来处理和控制线程,而threading 模块通过对thread进行二次封装,提供了更方便的api来处理线程。
线程创建有2种方式:
①直接调用(主流写法)
1 import threading,time 2 def run(n): #定义每个线程要运行的函数 3 print("test...",n) 4 time.sleep(2) 5 if __name__ == '__main__': 6 t1 = threading.Thread(target=run,args=("t1",)) #生成一个线程实例 //要加“,”号 7 t2 = threading.Thread(target=run,args=("t2",)) #生成另一个线程实例 8 # 两个同时执行,然后等待两秒程序结束 9 t1.start()#启动线程 10 t2.start()#启动另一个线程 11 print(t1.getName()) #获取线程名 12 print(t2.getName()) 13 # 程序输出 14 # test... t1 15 # test... t2
②继承式调用(非主流写法)(了解)
1 import threading,time 2 class MyThread(threading.Thread): 3 def __init__(self,num): 4 # threading.Thread.__init__(self) 5 super(MyThread,self).__init__() 6 self.num =num 7 def run(self):#定义每个线程要运行的函数 8 print("running on number:%s" %self.num) 9 time.sleep(2) 10 if __name__ == '__main__': 11 # 两个同时执行,然后等待两秒程序结束 12 t1 = MyThread(1) 13 t2 = MyThread(2) 14 t1.start() 15 t2.start() 16 # 程序输出 17 # running on number:1 18 # running on number:2
2、join(等待线程):等待线程执行完后,其他线程再继续执行(串行)
1 import threading,time 2 def run(n,sleep_time): 3 print("test...",n) 4 time.sleep(sleep_time) 5 print("test...done", n) 6 if __name__ == '__main__': 7 t1 = threading.Thread(target=run,args=("t1",2)) 8 t2 = threading.Thread(target=run,args=("t2",3) 9 # 两个同时执行,然后等待t1执行完成后,主线程和子线程再开始执行 10 t1.start() 11 t2.start() 12 t1.join() # 等待t1 13 print("main thread") 14 # 程序输出 15 # test... t1 16 # test... t2 17 # test...done t1 18 # main thread 19 # test...done t2
3、Daemon(守护线程):守护进程,主程序执行完毕时,守护线程会同时退出,不管是否执行完任务
1 import threading,time 2 def run(n): 3 print('[%s]------running---- ' % n) 4 time.sleep(2) 5 print('--done--') 6 def main(): 7 for i in range(5): 8 t = threading.Thread(target=run, args=[i, ]) 9 t.start() 10 t.join(1) 11 print('starting thread', t.getName()) 12 m = threading.Thread(target=main, args=[]) 13 m.setDaemon(True) # 将main线程设置为Daemon线程,它做为程序主线程的守护线程,当主线程退出时, 14 # m线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完任务 15 m.start() #注意:setDaemon一定在start之前设置 16 m.join(timeout=2) 17 print("---main thread done----") 18 # 程序输出 19 # [0]------running---- 20 # starting thread Thread-2 21 # [1]------running---- 22 # --done-- 23 # ---main thread done----
1 其它方法 2 # run(): 线程被cpu调度后自动执行线程对象的run方法 3 # start():启动线程活动。 4 # isAlive(): 返回线程是否活动的。 5 # getName(): 返回线程名。 6 # setName(): 设置线程名。 7 8 threading模块提供的一些方法: 9 # threading.currentThread(): 返回当前的线程变量。 10 # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 11 # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
4、Mutex(同步锁)(互斥锁):由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。(*注:不要在3.x上运行,不知为什么,3.x上的结果总是正确的,可能是自动加了锁)
1 import time 2 import threading 3 def addNum(): 4 global num # 在每个线程中都获取这个全局变量 5 print('--get num:', num) 6 time.sleep(1) 7 lock.acquire() # //修改数据前加锁// 8 num -= 1 # 对此公共变量进行-1操作 9 lock.release() # //修改后释放// 10 num = 100 # 设定一个共享变量 11 thread_list = [] 12 lock = threading.Lock() # //生成全局锁// 13 for i in range(100): 14 t = threading.Thread(target=addNum) 15 t.start() 16 thread_list.append(t) 17 for t in thread_list: # 等待所有线程执行完毕 18 t.join() 19 print('final num:', num)
注:GIL,无论启多少个线程,你有多少个cpu, Python在执行的时候在同一时刻只允许一个线程运行
5、死锁:在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁
RLock(递归锁) :解决死锁;为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
1 import threading, time 2 3 def run1(): 4 print("grab the first part data") 5 lock.acquire() 6 global num 7 num += 1 8 lock.release() 9 return num 10 11 def run2(): 12 print("grab the second part data") 13 lock.acquire() 14 global num2 15 num2 += 1 16 lock.release() 17 return num2 18 19 def run3(): 20 lock.acquire() 21 res = run1() 22 print('--------between run1 and run2-----') 23 res2 = run2() 24 lock.release() 25 print(res, res2) 26 27 if __name__ == '__main__': 28 num, num2 = 0, 0 29 lock = threading.RLock() 30 for i in range(10): 31 t = threading.Thread(target=run3) 32 t.start() 33 34 while threading.active_count() != 1: 35 print(threading.active_count()) 36 else: 37 print('----all threads done---') 38 print(num, num2)
6、Semaphore (信号量):Mutex 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。
信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数器,每当调用acquire()时-1,调用release()时+1;计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release(); BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。
1 import threading, time 2 def run(n): 3 semaphore.acquire() ### 4 time.sleep(1) 5 print("run the thread: %s " % n) 6 semaphore.release() ### 7 if __name__ == '__main__': 8 num = 0 9 semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时运行 10 for i in range(20): 11 t = threading.Thread(target=run, args=(i,)) 12 t.start() 13 while threading.active_count() != 1: 14 pass # print threading.active_count() 15 else: 16 print('----all threads done---') 17 print(num)
7、Event(事件):实现两个或多个线程间的交互
下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。
1 import threading,time 2 3 def light(): 4 count = 0 5 while True: 6 if count < 10: #红灯 7 print("