今日所学
一.ftp上传简单实例
二.socketsever的固定用法
三.验证合法性连接
1.ftp上传实例
这个题目是我们现在网络编程比较基础一点的题目
下面我们只写简单上传的代码
上传服务端的代码
import socket import struct import json sever=socket.socket() ip_port=('127.0.0.1',8008) sever.bind(ip_port) sever.listen() conn,addr=sever.accept() #先接受4个字节,为文件的信息的长度 struct_len_info=conn.recv(4) # 拿到字典的长度,(拿到的结果为元组,从元组中把数据拿出来) data_len=struct.unpack('i',struct_len_info)[0] # 然后根据拿到的长度,接收字典的字节类型 data_dict_bytes=conn.recv(data_len) # 将字节转换为json字符串类型 data_json=data_dict_bytes.decode('utf-8') # 将json字符串转换为字典 data_dict=json.loads(data_json) sum=0 #算出文件的路径 file_path='相对路径'+'\'+data_dict['file_name'] with open(file_path,mode='wb') as f: while sum<data_dict['file_size']: from_client_msg=conn.recv(1024) sum+=len(from_client_msg) f.write(from_client_msg)
上传客户端代码
import socket import os import struct import json client=socket.socket() cleint.connect(('127.0.0.1',8008)) #读取文件的大小(读出来的内容是文件的字节类型) file_size=os.path.getsize(r'相对路径') # 在发送真实的文件之前,先将文件的信息包装成字典发送给服务端(文件名字,文件大小,文件路径) file_info={ 'file_name':'XXX.XXX', 'file_size' : file_size, 'file_LuJing': '文件的路径' } #将字典序列化成json字符串 file_info_str=json.dumps(file_info) #将json字符串编码成字节 file_info_byte=file_info_str.encode('utf-8') #求出字典的字节长度 info_len=len(file_info_byte) # 打包 info_struct=struct.pack('i',info_len) # 将打包好的信息和长度发送给服务端 client.send(info_struct+file_info_byte) with open('你要发送的文件.xxx',mode='rb') as f: #如果你累加的长度小于文件的长度,让他继续执行 while sum<file_size: #每次读取的大小最大为1024 every_read=f.read(1024) sum+=len(every_read) #将读取到的数据发送给服务端 client.send(every_read)
注 : 上传和下载只是角色互换而已,上传为客户端向服务端中保存数据,而下载是客户端从服务端的内容中下载相应的文件.
2.socketsever的固定用法
话不多说,上代码
先上传服务端的代码
import socketsever
class 类名(socketsever.BaseRequestHandler):
# 注:这里的handle方法名是固定的,是不允许变的
def handle(self):
while 1:
# self.request相当于conn
from_client_msg=self.request.recv(1024)
print(from_client_msg.decode('utf-8'))
msg=input('服务端说:)
self.request.send(msg.encode('utf-8'))
if __name__='__main__':
ip_port=('127.0.0.1',8008)
sever=socketsever.ThreadingTCPSever(ip_port,类名)
sever.serve_forever()
以下是客户端的代码
import socket client=socket.socket() client.connect(('127.0.0.1',8008)) while 1: msg=input('客户端说>>>') if msg=='byebye': break client.send(msg.encode('utf-8')) from_sever_msg=client.recv(1024) print(from_sever_msg.decode('utf-8')) client.close()
3. 验证合法性连接的服务端
服务端:
from socket import * import hmac,os #秘钥 secret_key=b'Jedan has a big key!' def conn_auth(conn): print('开始验证新链接的合法性') msg=os.urandom(32)#生成一个32字节的随机字符串 conn.sendall(msg) h=hmac.new(secret_key,msg) digest=h.digest() respone=conn.recv(len(digest)) return hmac.compare_digest(respone,digest) def data_handler(conn,bufsize=1024): if not conn_auth(conn): print('该链接不合法,关闭') conn.close() return print('链接合法,开始通信') while True: data=conn.recv(bufsize) if not data:break conn.sendall(data.upper()) def server_handler(ip_port,bufsize,backlog=5): ''' 只处理链接 :param ip_port: :return: ''' tcp_socket_server=socket(AF_INET,SOCK_STREAM) tcp_socket_server.bind(ip_port) tcp_socket_server.listen(backlog) while True: conn,addr=tcp_socket_server.accept() print('新连接[%s:%s]' %(addr[0],addr[1])) data_handler(conn,bufsize) if __name__ == '__main__': ip_port=('127.0.0.1',9999) # 自己定义接收大小 bufsize=1024 server_handler(ip_port,bufsize)
客户端:
from socket import * import hmac,os secret_key=b'Jedan has a big key!' def conn_auth(conn): ''' 验证客户端到服务器的链接 :param conn: :return: ''' msg=conn.recv(32) h=hmac.new(secret_key,msg) digest=h.digest() conn.sendall(digest) def client_handler(ip_port,bufsize=1024): tcp_socket_client=socket(AF_INET,SOCK_STREAM) tcp_socket_client.connect(ip_port) conn_auth(tcp_socket_client) while True: data=input('>>: ').strip() if not data:continue if data == 'quit':break tcp_socket_client.sendall(data.encode('utf-8')) respone=tcp_socket_client.recv(bufsize) print(respone.decode('utf-8')) tcp_socket_client.close() if __name__ == '__main__': ip_port=('127.0.0.1',9999) bufsize=1024 client_handler(ip_port,bufsize)