需求
1. 使用SELECT或SELECTORS模块实现并发简单版FTP
2. 允许多用户并发上传下载文件
目录结构
ftp_server ├ bin # 执行文件目录 | └ ftp_server.py # 执行程序 ├ conf # 配置文件目录 | ├ setting # 配置文件。目前主要保存服务端sock和数据存储空间地址 | └ init_setting.py # 配置文件格式化程序 ├ core # 程序核心代码位置 | └ main.py # 主逻辑交互程序 └ storage # 服务端的数据存储空间 ftp_client ├ bin # 执行文件目录 | └ ftp_client.py # 执行程序 ├ conf # 配置文件目录 | ├ setting # 配置文件。目前主要保存服务端sock和客户端文件下载目录地址 | └ init_setting.py # 配置文件格式化程序 ├ core # 程序核心代码位置 | └ main.py # 主逻辑交互程序 └ download # 客户端文件下载目录
代码
ftp_server
1 import sys,os 2 3 basepath = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 4 sys.path.insert(0,basepath) 5 6 from core import main 7 main.main()
1 import configparser 2 3 c = configparser.ConfigParser() 4 5 c['server_info'] = { 6 'server_address' : ('0.0.0.0',12345), 7 'storage_dir' : 'storage' 8 } 9 10 with open('setting','w',encoding='utf-8') as f: 11 c.write(f)
1 #! /usr/bin/env python3 2 # -*- utf-8 -*- 3 # Author:Jailly 4 5 import configparser,os,selectors,socket,json 6 7 basepath = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 8 setting_path = os.path.join(os.path.join(basepath, 'conf'), 'setting') 9 10 11 def get_setting(setting_path): 12 cf = configparser.ConfigParser() 13 cf.read(setting_path) 14 setting_dict = {} 15 for section in cf.sections(): 16 setting_dict[section] = {} 17 for key,value in cf.items(section): 18 setting_dict[section][key] = value 19 20 return setting_dict 21 22 23 setting_dict = get_setting(setting_path) 24 server_address = eval(setting_dict['server_info']['server_address']) 25 storage_dir = os.path.join(basepath, setting_dict['server_info']['storage_dir']) 26 27 28 class FTPServer(object): 29 '''selectors版的ftp服务器类,封装ftp服务器的相关处理方法''' 30 31 def __init__(self,sock): 32 self.sock = sock 33 self.server = socket.socket() 34 self.server.setblocking(False) 35 self.selector = selectors.DefaultSelector() 36 self.file_path_dict = {} 37 self.file_size_dict = {} 38 self.get_fd_dict = {} 39 self.put_fd_dict = {} 40 self.put_file_size_dict = {} 41 42 def startup(self): 43 self.server.bind(self.sock) 44 self.server.listen() 45 self.selector.register(self.server,selectors.EVENT_READ,self.accept) 46 47 48 def accept(self,server): 49 conn,addr = server.accept() 50 conn.setblocking(False) 51 self.selector.register(conn,selectors.EVENT_WRITE,self.write) 52 # self.selector.register(conn,selectors.EVENT_READ,self.read) 53 54 55 def write(self,conn): 56 # 发送存储空间内的文件列表 57 file_list = os.listdir(storage_dir) 58 conn.send(json.dumps(file_list).encode()) 59 self.selector.unregister(conn) 60 self.selector.register(conn,selectors.EVENT_READ,self.read) 61 62 63 def read(self,conn): 64 65 cmd = conn.recv(1024).decode('utf-8') 66 67 if len(cmd.split()) == 2: 68 action = cmd.split()[0] 69 file_path = os.path.join(storage_dir, cmd.split()[1]) 70 71 if action == 'put': 72 conn.send(b'0') # 配合客户端完成阻塞,以防止客户端粘包(c:1) 73 74 file_path = os.path.join(storage_dir,os.path.basename(file_path)) 75 file_path = self.rename(file_path) if os.path.isfile(file_path) else file_path 76 f = open(file_path, 'wb') 77 self.put_fd_dict[conn] = f 78 self.put_file_size_dict[conn] = 0 79 80 self.selector.unregister(conn) 81 self.selector.register(conn,selectors.EVENT_READ,self.put1) 82 83 elif action == 'get': 84 if os.path.isfile(file_path): 85 conn.send(b'0') # 发送 0 ,告知客户端 要下载的文件存在,开始文件传输 86 87 self.file_path_dict[conn] = file_path 88 89 self.selector.unregister(conn) 90 self.selector.register(conn,selectors.EVENT_READ,self.get1) 91 else: 92 print('prepare to tell client the file that the user wanna to get does not exist') 93 conn.send(b'1') # # 发送 1 告知 客户端 文件不存在 94 print('has informed the client ') 95 self.selector.unregister(conn) 96 self.selector.register(conn,selectors.EVENT_WRITE,self.write) 97 98 else: 99 if cmd == 'exit' or cmd == '': 100 self.selector.unregister(conn) 101 conn.close() 102 103 elif cmd == 'skip': 104 self.selector.unregister(conn) 105 self.selector.register(conn,selectors.EVENT_WRITE,self.write) 106 107 108 def get1(self,conn): 109 conn.recv(1024) # 阻塞,以防止服务端粘包(s:1) 110 print('prepare to send file_size') 111 # 发送 下载文件的大小 112 file_path = self.file_path_dict[conn] 113 file_size = os.stat(file_path).st_size 114 self.file_size_dict[conn] = file_size 115 116 conn.send(str(file_size).encode()) 117 print('send file_size ',file_size) 118 119 self.selector.unregister(conn) 120 self.selector.register(conn,selectors.EVENT_READ,self.get2) 121 122 123 def get2(self,conn): 124 conn.recv(1024) # 阻塞,以防止服务端粘包(s:2) 125 print('send file start!') 126 file_path = self.file_path_dict[conn] 127 128 # with open(file_path,'rb') as f: 129 # for line in f: 130 # # print(line) 131 # conn.send(line) 132 # 133 # print('send file end!') 134 135 # get_progress = 0 136 f = open(file_path,'rb') 137 self.get_fd_dict[conn] = f 138 139 self.selector.unregister(conn) 140 self.selector.register(conn, selectors.EVENT_WRITE, self.get3) 141 142 conn.send(f.readline()) 143 144 145 def get3(self,conn): 146 f = self.get_fd_dict[conn] 147 148 conn.send(f.readline()) 149 150 get_progress = f.tell() 151 file_size = self.file_size_dict[conn] 152 153 if get_progress == file_size: 154 del self.get_fd_dict[conn] 155 del self.file_size_dict[conn] 156 del self.file_path_dict[conn] 157 f.close() 158 self.write(conn) 159 160 161 def put1(self,conn): 162 file_size = int(conn.recv(1024).decode('utf-8')) 163 self.file_size_dict[conn] = file_size 164 conn.send(b'0') # 配合客户端完成阻塞,以防止客户端粘包(c:2) 165 166 self.selector.unregister(conn) 167 self.selector.register(conn,selectors.EVENT_READ,self.put2) 168 169 170 def put2(self,conn): 171 172 f = self.put_fd_dict[conn] 173 accepting_data = conn.recv(1024) 174 f.write(accepting_data) 175 176 self.put_file_size_dict[conn] += len(accepting_data) 177 178 if self.put_file_size_dict[conn] == self.file_size_dict[conn]: 179 del self.put_file_size_dict[conn] 180 del self.file_size_dict[conn] 181 del self.put_fd_dict[conn] 182 183 f.close() 184 self.write(conn) 185 186 187 def rename(self,file_path,times = 1): 188 f_name,f_extension = os.path.splitext(file_path) 189 f_name += '(%s)'%str(times) 190 new_file_path = f_name + f_extension 191 192 if os.path.isfile(new_file_path): 193 times += 1 194 return self.rename(file_path,times) 195 else: 196 return new_file_path 197 198 def monitor(self): 199 while 1: 200 ready_list = self.selector.select() 201 for key,event in ready_list: 202 key.data(key.fileobj) 203 204 205 def main(): 206 207 fs = FTPServer(server_address) 208 fs.startup() 209 fs.monitor() 210 211 if __name__ == '__main__': 212 main()
ftp_client
1 import sys,os 2 3 basepath = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 4 sys.path.insert(0,basepath) 5 6 from core import main 7 main.main()
1 cf = configparser.ConfigParser() 2 3 cf['client_info'] = { 4 'server_address':('127.0.0.1',12345), 5 'download_dir':'download' 6 } 7 8 with open('setting','w',encoding='utf-8') as f: 9 cf.write(f)
1 #! /usr/bin/env python3 2 # -*- coding:utf-8 -*- 3 # Author:Jailly 4 5 import socket,os,configparser,json,sys 6 7 basepath = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 8 setting_path = os.path.join(os.path.join(basepath, 'conf'), 'setting') 9 10 11 def get_setting(setting_path): 12 cf = configparser.ConfigParser() 13 cf.read(setting_path) 14 setting_dict = {} 15 for section in cf.sections(): 16 setting_dict[section] = {} 17 for key,value in cf.items(section): 18 setting_dict[section][key] = value 19 20 return setting_dict 21 22 23 setting_dict = get_setting(setting_path) 24 server_address = eval(setting_dict['client_info']['server_address']) 25 download_dir = os.path.join(basepath, setting_dict['client_info']['download_dir']) 26 27 28 class FTPClient(object): 29 30 def __init__(self,server_adress): 31 self.socket = socket.socket() 32 self.socket.connect(server_adress) 33 34 35 def rename(self,file_path,times=1): 36 f_name,f_extension = os.path.splitext(file_path) 37 f_name += '(%s)'%str(times) 38 new_file_path = f_name + f_extension 39 40 if os.path.isfile(new_file_path): 41 times += 1 42 return self.rename(file_path,times) 43 else: 44 return new_file_path 45 46 47 def get(self,file_path): 48 self.socket.send(b'0') # 配合服务端完成阻塞,以防止服务端粘包(s:1) 49 # print('preare to get file_size') 50 file_size = int(self.socket.recv(1024).decode('utf-8')) 51 # print('get file_size ',file_size) 52 self.socket.send(b'0') # 配合服务端完成阻塞,以防止服务端粘包(s:2) 53 54 file_path = os.path.join(download_dir,file_path) 55 # print('prepare to handle file_path') 56 real_file_path = self.rename(file_path) if os.path.isfile(file_path) else file_path 57 # print('finished handling file_path') 58 accepted_size = 0 59 last_accepted_size = 0 60 print('Downloading %s: '%os.path.basename(file_path),end='') 61 with open(real_file_path,'wb') as f: 62 while accepted_size < file_size: 63 if accepted_size >= file_size - 1024: 64 buffersize = file_size - accepted_size 65 else: 66 buffersize = 1024 67 68 accepting_data = self.socket.recv(buffersize) 69 f.write(accepting_data) 70 accepted_size += len(accepting_data) 71 72 # 打印进度条 73 bar_num = (accepted_size - last_accepted_size) * 100 // file_size 74 75 if bar_num: 76 sys.stdout.write('#'*bar_num) 77 sys.stdout.flush() 78 last_accepted_size = accepted_size # 只有当前后2次的差达到文件大小的1%时,才为代表上一次传输大小的变量(last_accepted_size)赋值 79 80 print(' done') 81 82 def put(self,file_path): 83 self.socket.recv(1024) # 阻塞,以防止客户端粘包(c:1) 84 85 file_size = os.stat(file_path).st_size 86 self.socket.send(str(file_size).encode()) 87 88 self.socket.recv(1024) # 阻塞,以防止客户端粘包(c:2) 89 90 send_size = 0 91 last_send_size = 0 92 print('Uploading %s: '%os.path.basename(file_path),end = '') 93 with open(file_path,'br') as f: 94 for line in f: 95 self.socket.send(line) 96 send_size += len(line) 97 98 bar_num = (send_size - last_send_size)*100 // file_size 99 if bar_num: 100 sys.stdout.write('#'*bar_num) 101 sys.stdout.flush() 102 last_send_size = send_size 103 104 print(' done') 105 106 107 def interactive(self): 108 while 1: 109 # print('prepare to get file_list') 110 file_list = json.loads(self.socket.recv(1024).decode()) 111 print(' 服务器存储空间的文件列表: %s '%(' '.join([file for file in file_list]) if file_list 112 else '