Python之路 Day7 Socket
Socket简介
socket通常也称作"套接字",用于描述IP地址和端口,是一个通信链的句柄,应用程序通常通过"套接字"向网络发出请求或者应答网络请求。
socket起源于Unix,而Unix/Linux基本哲学之一就是“一切皆文件”,对于文件用【打开】【读写】【关闭】模式来操作。socket就是该模式的一个实现,socket即是一种特殊的文件,一些socket函数就是对其进行的操作(读/写IO、打开、关闭)
socket和file的区别:
- file模块是针对某个指定文件进行【打开】【读写】【关闭】
- socket模块是针对 服务器端 和 客户端Socket 进行【打开】【读写】【关闭】
Socket_远程执行命令
Server(服务端):
import socket
import os
server=socket.socket()
server.bind(('localhost',9998))
server.listen()
print('开始监听:')
while True:
conn,addr=server.accept()
print('新连接: ',conn.getpeername())
while True:
try:
data=conn.recv(1024).decode()
except ConnectionResetError:
print('客户端已经断开')
break
result=os.popen(data).read()
result_size=str(len(result.encode()))
conn.send(result_size.encode())
print(conn.recv(1024).decode())
conn.send(result.encode("utf-8"))
server.close()
Client(客户端):
import socket
client=socket.socket()
client.connect(('localhost',9998))
while True:
cmd=input('>>: ').strip().encode()
if not cmd:continue
client.send(cmd)
result_size=client.recv(1024).decode()
client.send('data size: {}'.format(result_size).encode())
init_size=0
all_data=b''
while int(result_size) != init_size:
print(int(result_size),init_size)
data=client.recv(1024)
init_size+=len(data)
all_data+=data
else:
print(all_data.decode())
client.close()
Socket_发送文件,并做md5验证
Server(服务端):
import socket
import os
import hashlib
server=socket.socket()
server.bind(('localhost',9998))
server.listen()
print('开始监听:')
while True:
conn,addr=server.accept()
print('新连接: ',conn.getpeername())
while True:
data=conn.recv(1024).decode()
if len(data) == 0:
print('客户端已经断开')
break
cmd,filename=data.split()
if os.path.isfile(filename):
result_size=os.stat(filename).st_size
conn.send(str(result_size).encode())
print(conn.recv(1024).decode())
f=open(filename,"rb")
m=hashlib.md5()
print('开始传输')
for line in f:
m.update(line)
conn.send(line)
f.close()
conn.send(m.hexdigest().encode())
server.close()
Client(客户端):
import socket
import hashlib
client=socket.socket()
client.connect(('localhost',9998))
while True:
cmd=input('>>:').strip()
if not cmd:continue
if cmd.startswith("get"):
filename=cmd.split()[1]
client.send(cmd.encode())
result_size=client.recv(1024).decode()
print('file size: ',result_size)
client.send('data size: {}'.format(result_size).encode())
init_size=0
all_data=b''
m=hashlib.md5()
while int(result_size) > init_size:
if int(result_size)-init_size>1024:
size=1024
else:
size=int(result_size)-init_size
data=client.recv(size)
m.update(data)
init_size+=len(data) #此处一定不能写1024,你虽然写的是接收1024,但实际上可能接收的多或少于1024.
all_data+=data
print(int(result_size),init_size)
else:
server_md5=client.recv(1024).decode()
clinet_md5=m.hexdigest()
print(server_md5,clinet_md5)
if server_md5 == clinet_md5:
with open(filename+".bak",'wb') as f:
f.write(all_data)
else:
print('md5值不相等')
client.close()
SocketServer_同时处理多个请求
Server(服务端):
import socketserver
class MySockerServer(socketserver.BaseRequestHandler):
def handle(self):
while True:
try:
self.data=self.request.recv(1024).strip()
except ConnectionResetError:
print('{} 已经断开...'.format(self.client_address[:]))
break #使用break退出当前会话
print('{} wrote: '.format(self.client_address[0]))
self.request.sendall(self.data.upper())
if __name__ == '__main__':
server=socketserver.ThreadingTCPServer(('localhost',9999),MySockerServer)
server.serve_forever()
Socket注意事项
- 数据的传输必须是bytes类型
- send空数据时会卡住,在send前判断数据长度。
- send一次最多发32k数据。
- socket有一个缓冲区,当客户端接收不了那么多的数据时,剩下的数据就会保留在缓冲区中,客户端再执行命令时会优先返回上一次没有发送完的数据。如何解决,服务端发送大量数据,客户端如何接收。答:循环接收,数据发送前告诉客户端命令结果的长度。
- 处理粘包的方式:
动态设置接收的大小
和time.sleep(0.5)
,还有在两次send之间加上一次数据的传输
Socket更多功能
Socket地址簇
socket.AF_INET IPv4(默认)
socket.AF_INET6 IPv6
socket.AF_UNIX 只能够用于单一的Unix系统进程间通信
Socket类型
socket.SOCK_STREAM 流式socket , for TCP (默认)
socket.SOCK_DGRAM 数据报式socket , for UDP
socket.SOCK_RAW 原始套接字,普通的套接字无法处理ICMP、IGMP等网络报文,而SOCK_RAW可以;其次,SOCK_RAW也可以处理特殊的IPv4报文;此外,利用原始套接字,可以通过IP_HDRINCL套接字选项由用户构造IP头。
socket.SOCK_RDM 是一种可靠的UDP形式,即保证交付数据报但不保证顺序。SOCK_RAM用来提供对原始协议的低级访问,在需要执行某些特殊操作时使用,如发送ICMP报文。SOCK_RAM通常仅限于高级用户或管理员运行的程序使用。
Socket方法
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)
1.sk.bind(address) #将套接字绑定到地址。address地址的格式取决于地址族。在AF_INET下,以元组(host,port)的形式表示地址。
2.sk.listen(backlog) #开始监听传入连接。backlog指定在拒绝连接之前,可以挂起的最大连接数量。backlog等于5,表示内核已经接到了连接请求,但服务器还没有调用accept进行处理的连接个数最大为5,这个值不能无限大,因为要在内核中维护连接队列。
3.sk.setblocking(bool) #是否阻塞(默认True),如果设置False,那么accept和recv时一旦无数据,则报错。
4.sk.accept() #接受连接并返回(conn,address),其中conn是新的套接字对象,可以用来接收和发送数据。address是连接客户端的地址。接收TCP 客户的连接(阻塞式)等待连接的到来
5.sk.connect(address) #连接到address处的套接字。一般,address的格式为元组(hostname,port),如果连接出错,返回socket.error错误。
6.sk.connect_ex(address) #同上,只不过会有返回值,连接成功时返回 0 ,连接失败时候返回编码,例如:10061
7.sk.close() #关闭套接字
8.sk.recv(bufsize[,flag]) #接受套接字的数据。数据以字符串形式返回,bufsize指定最多可以接收的数量。flag提供有关消息的其他信息,通常可以忽略。
9.sk.recvfrom(bufsize[.flag]) #与recv()类似,但返回值是(data,address)。其中data是包含接收数据的字符串,address是发送数据的套接字地址。
10.sk.send(string[,flag]) #将string中的数据发送到连接的套接字。返回值是要发送的字节数量,该数量可能小于string的字节大小。即:可能未将指定内容全部发送。
11.sk.sendall(string[,flag] #将string中的数据发送到连接的套接字,但在返回之前会尝试发送所有数据。成功返回None,失败则抛出异常。内部通过递归调用send,将所有内容发送出去。
12.sk.sendto(string[,flag],address #将数据发送到套接字,address是形式为(ipaddr,port)的元组,指定远程地址。返回值是发送的字节数。该函数主要用于UDP协议。
13.sk.settimeout(timeout) #设置套接字操作的超时期,timeout是一个浮点数,单位是秒。值为None表示没有超时期。一般,超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如 client 连接最多等待5s )
14.sk.getpeername() #返回连接套接字的远程地址。返回值通常是元组(ipaddr,port)。
15.sk.getsockname() #返回套接字自己的地址。通常是一个元组(ipaddr,port)
16.sk.fileno() #套接字的文件描述符
每日练习: 开发一个支持多用户在线的FTP程序
1. 用户加密认证
2. 允许同时多用户登录
3. 每个用户有自己的家目录 ,且只能访问自己的家目录
4. 对用户进行磁盘配额,每个用户的可用空间不同
5. 允许用户在ftp server上随意切换目录
6. 允许用户查看当前目录下文件
7. 允许上传和下载文件,保证文件一致性
8. 文件传输过程中显示进度条
附加功能:支持文件的断点续传
代码如下:
FTPServer:
# Author:ZhuYuLiang
import socketserver
import json
import hashlib
import os
#用户名,密码,磁盘空间大小
user_db={
'zyl':['123456',41943040],
'wq':['zgkm123456',51943040],
}
class MyFTP(socketserver.BaseRequestHandler):
def handle(self):
'''
handle函数主要接收用户的信息,解析并调用其他函数。
:return:
'''
while True:
self.info=self.receive() #接收的信息为用户名和MD5密码
if not self.info:
break #如果用户断开的话这退出这个函数,等待下一个连接
print('A new conection {}'.format(self.client_address[:]))
self.info=json.loads(self.info)
authentication_result=self.authentication_User(self.info)
if authentication_result[0] == 200:
self.name=self.info['name']
self.send(str(authentication_result).encode()) #返回认证代码
while True:
self.choice = self.receive() #接收用户的命令
if not self.choice: break
self.choice=json.loads(self.choice)
cmd=self.choice['cmd']
parameter=self.choice['parameter']
fun=getattr(self,cmd)
print(self.choice)
if cmd == 'put':
target_dir=self.choice['target_dir']
fun(parameter,target_dir)
elif cmd == 'get':
position=self.choice['position']
fun(parameter,position)
else:
fun(parameter)
else:
self.send(str(authentication_result).encode()) #返回认证代码
continue
def receive(self,size=1024):
'''
此函数专门用于接收用户请求,返回数据
:return:
'''
try:
self.data = self.request.recv(size).strip()
if not self.data:
raise ConnectionResetError
else:
return self.data.decode()
except ConnectionResetError:
print('{} 已经断开'.format(self.client_address[:]))
return False
def send(self,*args):
'''
此函数专门用来发送数据到客户端。
:param args:
:return:
'''
try:
self.request.send(args[0])
return True
except ConnectionResetError:
print('{} 已经断开'.format(self.client_address[:]))
return False
def cd(self,parameter):
'''
此函数主要功能判断用户要切换的目录是否存在
:param parameter:
:return:
'''
if os.path.isdir(parameter):
self.send(str(200).encode())
else:
self.send(str(256).encode())
def ls(self,parameter):
'''
此函数主要判断用户要切换的目录是否存在
:param parameter:
:return:
'''
self.data=os.listdir(parameter)
self.send(str(self.data).encode())
def get(self,parameter,position):
'''
此函数用来下载文件给客户端。
:param parameter:
:param position:
:return:
'''
m=hashlib.md5()
if os.path.isfile(parameter):
result_size = os.stat(parameter).st_size-int(position)
self.send(str(result_size).encode())
print(self.receive())
with open(parameter,'rb') as f:
f.seek(int(position),0)
for line in f:
m.update(line)
if not self.send(line):
return False
self.send(m.hexdigest().encode())
print(m.hexdigest().encode())
else:
print(parameter)
self.send(str(255).encode())
def put(self,parameter,*args):
'''
此函数用户客户端上传代码到服务端,数据支持断点续传。
:param parameter:
:param args:
:return:
'''
m = hashlib.md5()
if os.path.isfile("{}/{}".format(args[0],parameter)):
file_size=os.stat("{}/{}".format(args[0], parameter)).st_size
info=(200,file_size)
else:
info=(255,)
self.send(json.dumps(info).encode())
file_size = self.receive()
if user_db[self.name][1] > int(file_size) and os.path.isdir(args[0]):
self.send('True'.encode())
init_size = 0
while int(file_size) > init_size:
if int(file_size) - init_size > 1024:
size = 1024
else:
size = int(file_size) - init_size
try:
data=self.request.recv(size)
except ConnectionResetError:
print('{} 已经断开'.format(self.client_address[:]))
return False
m.update(data)
with open("{}/{}".format(args[0], parameter), 'ab') as f:
f.write(data)
user_db[self.name][1] -= int(len(data))
init_size+=len(data)
else:
server_md5 = self.receive()
clinet_md5 = m.hexdigest()
if server_md5 != clinet_md5:
print('md5值不相等')
os.remove("{}/{}".format(args[0], parameter))
else:
self.send('False'.encode())
def mkdir(self,dir_name):
os.mkdir(dir_name)
def authentication_User(self,user_info,*args,**kwargs):
'''
此方法用于用户身份认证,传入用户信息,判断用户目录是否存在不存在则创建。
:param user_info:
:return:
'''
Client_name=user_info['name']
Client_passwd=user_info['passwd']
if Client_name in user_db:
m = hashlib.md5()
m.update(user_db[Client_name][0].encode())
Server_passwd=m.hexdigest()
if Client_passwd == Server_passwd:
if not os.path.isdir(Client_name):
self.mkdir(Client_name)
return 200,user_db[Client_name][1]
else:
return 253
else:
return 253
if __name__ == '__main__':
server=socketserver.ThreadingTCPServer(('localhost',9999),MyFTP)
print(" 33[1;32;1m您的FTP服务器已经运行起来了.. 33[0m")
server.serve_forever()
FTPClient:
# Author:ZhuYuLiang
import socket
import hashlib
import json
import os
#验证码信息
STATUS_CODE = {
251:"Invalid cmd",
252:"Invalid auth data",
253:"Wrong username or password",
254:"Passed authentication",
255:"file doesn't exist on server",
256:"dir doesn't exist on server",
257:"ready to send file",
258:"md5 verification",
259:"No permission to switch to this directory",
200:"Authentication is successful"
}
class ClinetSocket(object):
'''
每个用户新实例化一次ClientSocket类,初始需要传入服务端主机ip和连接端口
'''
def __init__(self,host,port,*args,**kwargs):
self.clinet = socket.socket()
self.clinet.connect((host, port))
def authentication_User(self,name,passwd,*args,**kwargs):
'''
此函数用于认证用户,发送的密码为MD5值,服务端比对后返回结果。
:param name: 用户名
:param passwd: 密码
:return:
'''
m = hashlib.md5()
self.name = name
self.passwd = m.update(passwd.encode())
self.user_info = {
'name': self.name,
'passwd': m.hexdigest()
}
self.clinet.send(json.dumps(self.user_info).encode())
result = self.clinet.recv(1024).decode()
return result
def get(self, cmd, parameter, *args):
'''
此函数用来获取服务端的文件,数据支持断点续传,传输过程中有进度条。
:param cmd:
:param parameter:
:param args:
:return:
'''
Current_dir = args[0] # 当前目录,内存地址
Current_DirHierarchy = args[1] #当前目录层级,内存地址
m = hashlib.md5()
info={
'cmd':cmd,
'parameter':"{}/{}".format(Current_dir[0],parameter),
'position':0
}
if os.path.isfile(parameter.split('/')[-1]):
f = open(parameter.split('/')[-1], 'rb')
f.seek(0, 2)
info['position']=f.tell()
f.close()
self.clinet.send(json.dumps(info).encode())
file_size = self.clinet.recv(1024).decode() # 返回文件大小
if int(file_size) == 255:
print(' 33[31;1m{} 33[0m'.format(STATUS_CODE[255]))
return False
else:
self.clinet.send('我已经准备好接收了'.encode()) #为防止黏包,穿插一次网络I/O
#开始接收
init_size = 0
while int(file_size) > init_size:
if int(file_size) - init_size > 1024:
size = 1024
else:
size = int(file_size) - init_size
data = self.clinet.recv(size)
init_size += len(data)
m.update(data)
recv_per = int(100 * init_size / int(file_size))
self.progress_bar(recv_per, width=50)
with open(parameter.split('/')[-1], 'ab') as f:
f.write(data)
else:
print(int(file_size), init_size)
server_md5 = self.clinet.recv(1024).decode()
clinet_md5 = m.hexdigest()
if server_md5 == clinet_md5:
print('接收完毕')
else:
print('md5值不相等')
os.remove(parameter.split('/')[-1])
def put(self,cmd,source_file,target_dir,*args):
'''
此函数用来上传本地文件,数据支持断点续传,传输过程中有进度条。
:param cmd:
:param source_file: 本地文件
:param target_dir: 服务器的目录
:param args:
:return:
'''
Current_dir = args[0] # 当前目录,内存地址
Current_DirHierarchy = args[1] #当前目录层级,内存地址
m=hashlib.md5()
info={
'cmd':cmd,
'parameter':source_file,
'target_dir':target_dir,
}
if os.path.isfile(source_file):
self.clinet.send(json.dumps(info).encode())
code=json.loads(self.clinet.recv(1024).decode())
if code[0] == 200:
position = code[1]
result_size = os.stat(source_file).st_size - position
else:
position=0
result_size = os.stat(source_file).st_size
self.clinet.send(str(result_size).encode())
result=self.clinet.recv(1024).decode()
init_size=0
print(result)
if eval(result):
with open(source_file,'rb') as f:
f.seek(position, 0)
for line in f:
m.update(line)
self.clinet.send(line)
init_size+=len(line)
recv_per = int(100 * init_size / int(result_size))
self.progress_bar(recv_per, width=50)
print('传输完毕')
self.clinet.send(m.hexdigest().encode())
else:
print('你的磁盘空间不足,或目标目录不存在')
else:
print('文件不存在')
def progress_bar(self,percent,width=50):
'''文件传输进度打印功能'''
if percent >= 100:
percent = 100
show_str = ('[%%-%ds]' % width) % (int(width * percent / 100) * "#")
#字符串拼接的嵌套使用,'-'代表左对齐,width代表宽度。
print('
%s %d%%' % (show_str, percent), end='')
def ls(self, cmd, parameter,*args):
'''
此函数用于查看目录下的文件。
:param cmd:
:param parameter:
:return:
'''
Current_dir = args[0] # 当前目录,内存地址
Current_DirHierarchy = args[1] #当前目录层级,内存地址
info = {
'cmd': cmd,
'parameter': None
}
if parameter == '.':
info['parameter']=Current_dir[0]
elif parameter == '..':
if Current_DirHierarchy[0] - 1 >= 0:
info['parameter'] = ''.join(Current_dir[0].split('/')[:-1])
else:
print(' 33[31;1m{} 33[0m'.format(STATUS_CODE[259]))
return
elif parameter == '/':
info['parameter'] = Current_dir[0].split('/')[0]
else:
info['parameter']="{}/{}".format(Current_dir[0],parameter)
self.clinet.send(json.dumps(info).encode())
file_list=eval(self.clinet.recv(1024).decode())
for line in file_list:
print(line, end=' ')
print()
return True
def cd(self,cmd,parameter,*args):
'''
此函数用户切换用户的目录,没有与服务端交互,只是改变了用户的显示目录
:param cmd: 命令
:param parameter: 目标目录
:param args: args[0][0]当前目录,args[1][0]当前目录层级
:return:
'''
Current_dir = args[0] # 当前目录,内存地址
Current_DirHierarchy = args[1] #当前目录层级,内存地址
if parameter == '.':
pass
elif parameter == '..':
if Current_DirHierarchy[0]-1 >= 0:
Current_dir[0]="{}".format(''.join(Current_dir[0].split('/')[:-1])) #删除当前目录最后一级
else:
print(' 33[31;1m{} 33[0m'.format(STATUS_CODE[259]))
elif parameter == '/':
Current_dir[0]=Current_dir[0].split('/')[0]
else:
info={
'cmd':cmd,
'parameter':"{}/{}".format(Current_dir[0],parameter)
}
self.clinet.send(json.dumps(info).encode())
code=self.clinet.recv(1024).decode()
if int(code) == 200:
Current_dir[0] = "{}/{}".format(Current_dir[0], parameter)
else:
print(' 33[31;1m{} 33[0m'.format(STATUS_CODE[int(code)]))
Current_DirHierarchy[0]=Current_dir[0].count('/') #根据当前目录包含“/”的个数来确定目录层级
@property
def hele(self):
print(''' 33[1m;36;1m
ls: "."代表当前目录,".."代表父级目录,"/",代表家目录
cd: "."代表当前目录,".."代表父级目录,"/",代表家目录
get: 示例: get 服务端文件 #只能下载到当前目录
put: 示例: put 本地文件 服务端目录 #自能将当前目录文件上传到服务端
33[0m''')
def __del__(self):
self.clinet.close()
def Instantiate_Client():
'''
此函数用于初始化连接
:return:
'''
while True:
info=input('请输入服务端的IP和端口,以空格分开: ')
if len(info.split()) != 2:
print(' 33[31;1m输入错误请重试 33[0m')
continue
else:
host=info.split()[0]
port=int(info.split()[1])
try:
Client = ClinetSocket('localhost', 9999)
except ConnectionRefusedError:
print(' 33[31;1m主机或端口错误请重试! 33[0m')
continue
else:
print(' 33[36;1m连接服务端成功 33[0m')
return Client
def main():
'''
此函数是主函数,功能是:生成用户实例,认证用户,解析用户指令并执行。
:return:
'''
print(' 33[1;36;1m欢迎使用朱玉亮编写的FTP客户端 33[0m'.center(50,"+"))
Client = Instantiate_Client()
count = 0
while count < 3:
user = input('请输入账号: ')
passwd = input('请输入密码: ')
result = eval(Client.authentication_User(user, passwd))
if result[0] == 200:
print(' 33[36;1m{} 33[0m'.format(STATUS_CODE[result[0]]))
print('您现在的磁盘空间额度为: 33[36;1m{}MB 33[0m'.format(result[1]/1024/1024))
break
else:
print(' 33[31;1m{} 33[0m'.format(STATUS_CODE[result[0]]))
count+=1
else:
print(' 33[31;1m多次输入错误 33[0m')
exit()
UserHomeDir_DirHierarchy = [user,0] #初始的家目录路径和目录层数
Current_dir = [UserHomeDir_DirHierarchy[0]] # 当前目录
Current_DirHierarchy = [UserHomeDir_DirHierarchy[1]] # 当前目录层级
while True:
all_cmd = input('[{}]: '.format(Current_dir[0])).strip()
if not all_cmd:
continue
cmd = all_cmd.split()[0]
if cmd == 'exit': #命令如果为'exit',则退出
break
elif hasattr(Client, cmd):
func = getattr(Client, cmd) #将用户输入的命令,转成对应的方法。
if cmd == 'ls' or cmd == 'cd':
if len(all_cmd.split()) == 1:
parameter='.' #用户没有输入的话,默认为当前目录
elif len(all_cmd.split()) == 2:
parameter=all_cmd.split()[1]
else:
print(' 33[31;1m{} 33[0m'.format(STATUS_CODE[251]))
continue
func(cmd, parameter, Current_dir, Current_DirHierarchy)
elif cmd == 'get':
if len(all_cmd.split()) == 2:
parameter = all_cmd.split()[1]
else:
print(' 33[31;1m{} 33[0m'.format(STATUS_CODE[251]))
continue
func(cmd, parameter, Current_dir, Current_DirHierarchy)
elif cmd == 'put':
if len(all_cmd.split()) == 3:
source_file = all_cmd.split()[1]
if all_cmd.split()[2] == '.':
target_dir = Current_dir[0]
elif all_cmd.split()[2] == '..':
if Current_DirHierarchy[0] - 1 >= 0:
target_dir=''.join(Current_dir[0].split('/')[:-1])
else:
print(' 33[31;1m{} 33[0m'.format(STATUS_CODE[259]))
return
else:
target_dir=all_cmd.split()[2]
else:
print(' 33[31;1m{} 33[0m'.format(STATUS_CODE[251]))
continue
func(cmd,source_file,target_dir,Current_dir,Current_DirHierarchy)
elif cmd == 'help':
Client.hele
else:
print(' 33[31;1m{} 33[0m'.format(STATUS_CODE[251]))
continue
if __name__ == '__main__':
main()