需求:开发一个支持多用户同时在线的FTP程序
要求:
1、用户加密认证
2、允许同时多用户登录(用到并发编程的知识,选做)
3、每个用户有自己的家目录,且只能访问自己的家目录
4、对用户进行磁盘配额,每个用户的可用空间不同(选做)
5、允许用户在ftp server上随意切换目录
6、允许用户查看当前目录下的文件
7、允许上传和下载文件,并保证文件的一致性
8、文件传输过程中显示进度条
9、附加:支持文件的断点续传(选做)
开发的程序需符合PEP8开发规范,及专业的生产软件设计规范,包括目录、代码命名、功能接口等
client
confsettings
import os BASE_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) DOWN_PATH = os.path.join(BASE_PATH, "download") UP_PATH = os.path.join(BASE_PATH, "upload") CODING = "utf-8" MAX_PACKET_SIZE = 8192
client
import os import sys import socket import struct import json import hashlib import shelve sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from conf import settings class MYClient: """ ftp客户端 """ address_family = socket.AF_INET socket_type = socket.SOCK_STREAM def __init__(self, server_address): self.server_address = server_address self.socket = socket.socket(self.address_family, self.socket_type) self.client_connect() self.username = None self.client_status = False self.terimal = None self.shelve_obj = shelve.open("db") self.server_file_path = None self.home_path = None def client_connect(self): """与服务器连接""" self.socket.connect(self.server_address) def read_file(self, path): """读取文件""" with open(path, "rb") as f: return f.read() def hash_md5(self, msg): """加密""" m = hashlib.md5() m.update(msg) # print(m.hexdigest()) return m.hexdigest() def header(self, status, **kwargs): """制作、发送报头""" header_dic = kwargs header_dic["status"] = status header_json = json.dumps(header_dic) header_bytes = header_json.encode(settings.CODING) self.socket.send(struct.pack("i", len(header_bytes))) # header_dic的大小传送给客户端 self.socket.send(header_bytes) # header_dic数据传送给客户端 def recv_header(self): """接收报头""" header = self.socket.recv(4) # 接收报头 header_size = struct.unpack("i", header)[0] header_bytes = self.socket.recv(header_size) # 接收报头信息 header_json = header_bytes.decode(settings.CODING) header_dic = json.loads(header_json) return header_dic def get(self, data): """ 下载 :param data: 指令、文件名、用户名 :return: """ if len(data) == 2: username = data[1] filename = input("请输入上传文件名:") else: username = data[2] filename = data[1] msg = {"action_type": "get", "filename": filename, "username": username, "s_file_path": self.server_file_path} self.socket.send(json.dumps(msg).encode(settings.CODING)) header_dic = self.recv_header() if header_dic["status"] == "200": self.socket.send(header_dic["status"].encode(settings.CODING)) c_file_path = os.path.join(os.path.join(settings.DOWN_PATH, msg["username"]), msg["filename"]) self.server_file_path = header_dic["s_file_path"] long = str(len(self.shelve_obj.keys())+1) while True: if long in self.shelve_obj.keys(): long = str(int(long) + 1) else: break if os.path.isfile(c_file_path): print("%s文件已存在" % filename) self.socket.send("000".encode(settings.CODING)) return else: self.socket.send("999".encode(settings.CODING)) self.shelve_obj[long] = { "filename": msg["filename"]+".download", "s_file_path": self.server_file_path+".download", "file_size": header_dic["file_size"] } with open("%s.download" % c_file_path, "wb")as f: # 接收数据 recv_size = 0 while recv_size < header_dic["file_size"]: line = self.socket.recv(settings.MAX_PACKET_SIZE) f.write(line) recv_size += len(line) self.progress_bar(recv_size, header_dic["file_size"]) f.close() os.rename("%s.download" % c_file_path, c_file_path) num = self.hash_md5(self.read_file(c_file_path)) if num == header_dic["md5"]: self.socket.send("999".encode(settings.CODING)) print("下载完成") del self.shelve_obj[long] else: self.socket.send("000".encode(settings.CODING)) print("文件下载出错") elif header_dic["status"] == "210": self.socket.send(header_dic["status"].encode(settings.CODING)) print(header_dic["status_msg"]) def resume(self): """ 断点续传 :return: """ if len(self.shelve_obj.keys()) == 0: return print("未传送完成文件".center(50, "-")) for k in self.shelve_obj.keys(): relative_path = self.shelve_obj[k]["s_file_path"].replace(self.home_path, "") print("序号:%s,文件名:%s,文件大小:%s,文件地址:%s" % (k, self.shelve_obj[k]["filename"], self.shelve_obj[k]["file_size"], relative_path)) while True: print("请输入继续传送文件的序号,退出请输“q”") choice = input(">>") if not choice: continue elif choice == "q": return elif choice.isdigit(): file_path = os.path.join(os.path.join(self.home_path, self.username), self.shelve_obj[choice]["s_file_path"]).rstrip(".download") filename = self.shelve_obj[choice]["filename"].rstrip(".download") complete_size = self.shelve_obj[choice]["file_size"] incomplete_size = os.path.getsize( os.path.join(os.path.join(settings.DOWN_PATH, self.username), self.shelve_obj[choice]["filename"])) header_dic = {"filename": filename, "s_file_path": file_path, "incomplete_size": incomplete_size} client_path = os.path.join(os.path.join(settings.DOWN_PATH, self.username), filename) header_dic["client_path"] = client_path msg = {"action_type": "resume", "filename": filename, "username": self.username} self.socket.send(json.dumps(msg).encode(settings.CODING)) if int(choice) > 0 and int(choice) <= len(self.shelve_obj.keys()): if self.socket.recv(3).decode(settings.CODING) == "999": status = "500" self.header(status, **header_dic) header_dic = self.recv_header() with open("%s.download" % header_dic["client_path"], "ab")as f: # 接受真实的数据 while incomplete_size < complete_size: line = self.socket.recv(settings.MAX_PACKET_SIZE) f.write(line) incomplete_size += len(line) self.progress_bar(incomplete_size, complete_size) f.close() os.rename("%s.download" % header_dic["client_path"], header_dic["client_path"]) num = self.hash_md5(self.read_file(header_dic["client_path"])) if num == header_dic["md5"]: self.socket.send("999".encode(settings.CODING)) print("下载完成") del self.shelve_obj[choice] else: self.socket.send("000".encode(settings.CODING)) print("文件下载出错") else: print("输入错误,请重新输入!") def put(self, data): """ 上传 :param data: :return: """ if len(data) == 2: username = data[1] filename = input("请输入上传文件名:") else: username = data[2] filename = data[1] c_file_path = os.path.join(os.path.join(settings.UP_PATH, username), filename) if os.path.isfile(c_file_path): msg = {"action_type": "put", "filename": filename, "username": username} self.socket.send(json.dumps(msg).encode(settings.CODING)) res = self.socket.recv(3).decode(settings.CODING) if res == "999": status = "200" file_size = os.path.getsize(c_file_path) header_dic = { "filename": data[1], "md5": self.hash_md5(self.read_file(c_file_path)), "file_size": file_size, "s_file_path": self.server_file_path } self.header(status, **header_dic) if self.socket.recv(3).decode(settings.CODING) == "000": ask = input("%s文件已存在,是否覆盖?" % filename) if ask == "n": self.socket.send("000".encode(settings.CODING)) return elif ask == "y": self.socket.send("999".encode(settings.CODING)) else: print("输入错误") self.socket.send("000".encode(settings.CODING)) return else: self.socket.send("999".encode(settings.CODING)) header_dic = self.recv_header() if header_dic["status"] == "300": send_size = 0 with open(c_file_path, "rb")as f: for line in f: self.socket.send(line) send_size += len(line) self.progress_bar(send_size, file_size) f.close() res = self.socket.recv(3).decode(settings.CODING) if res == "999": print("上传成功!") else: print("上传失败!") else: print(header_dic["status_msg"]) return else: return else: print("文件不存在") return def progress_bar(self, recv_size, file_size): """ 进度条 :param recv_size: 已接收大小 :param file_size: 总共大小 :return: """ rate = recv_size / file_size rate_num = int(rate * 100) number = int(50 * rate) r = ' [%s%s]%d%%' % ("#" * number, " " * (50 - number), rate_num,) print(" {}".format(r), end=" ") def login(self): """ 用户验证 :return: """ count = 0 while count < 3: username = input("请输入用户名:").strip() if not username: continue password = input("请输入密码:").strip() msg = {"action_type": "login", "username": username, "password": password} self.socket.send(json.dumps(msg).encode(settings.CODING)) header_dic = self.recv_header() if header_dic["status"] == "100": self.home_path = header_dic["home_path"] self.username = username print("登陆成功,欢迎%s" % username) self.terimal = "%s" % username return True elif header_dic["status"] == "110": print("用户名或密码错误") count += 1 # return False def run(self): """ 与服务器的所有交互 :return: """ if not self.username: self.client_status = self.login() if self.client_status: self.resume() while True: print("输入help可看帮助") user_input = input("%s,请输入命令:" % self.terimal).strip() if not user_input: continue data = user_input.split() cmd = data[0] data.append(self.username) # [get,1.mp3,username] # print(data) if hasattr(self, cmd): func = getattr(self, cmd) func(data) else: print("输入有误,请重新输入") def help(self, data): """ 帮助 :param data: :return: """ msg = {"get 文件名": "下载文件", "put 文件名": "上传文件", "dir": "查看当前路径", "cd 目标路径": "切换目录" } for k in msg: print("指令:“%s”,功能:%s" % (k, msg[k])) def dir(self, data): """ 查看当前目录 :param data: :return: """ msg = {"action_type": "dir", "username": data[1]} self.socket.send(json.dumps(msg).encode(settings.CODING)) msg_dic = self.recv_header() if msg_dic["status"] == "200": print(msg_dic["msg"]) else: print(msg_dic["status_msg"]) def cd(self, data): """ 切换目录 :param data: :return: """ if len(data) == 2: target = input("请输入切换到的目录:") else: target = data[1] msg = {"action_type": "cd", "target": target} self.socket.send(json.dumps(msg).encode(settings.CODING)) msg_dic = self.recv_header() if msg_dic["status"] == "400": print("目录切换成功") self.server_file_path = msg_dic["path"] # print(msg_dic["path"]) self.terimal = msg_dic["current_path"] else: print(msg_dic["status_msg"]) client = MYClient(('127.0.0.1', 8080)) client.run()
server
binserver
import os import sys BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) if __name__ == "__main__": from core import main from conf import settings ftp_server = main.MYServer(settings.server_address) ftp_server.run()
confsettings
import os BASE_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) CONF_PATH = os.path.join(BASE_PATH, "conf") SERVER_PATH = os.path.join(BASE_PATH, "core") HOME_PATH = os.path.join(BASE_PATH, "home") SERVER_ADDRESS = ("127.0.0.1", 8080) REQUEST_QUEUEST_SIZE = 5 MAX_PACKET_SIZE = 8192 CODING = "utf-8" ALLOW_REUSE_ADDRESS = False
用户初始化信息
import configparser import hashlib config = configparser.ConfigParser() config["alex"] = {} password = "abc123" n = hashlib.md5() n.update(password.encode("utf-8")) config["alex"]["name"] = "alex" config["alex"]["password"] = n.hexdigest() config["alex"]["quato"] = "5" config["egon"] = {} password = "efg456" m = hashlib.md5() m.update(password.encode("utf-8")) config["egon"]["name"] = "egon" config["egon"]["password"] = m.hexdigest() config["alex"]["quato"] = "3" config["jack"] = {} password = "hij789" l = hashlib.md5() l.update(password.encode("utf-8")) config["jack"]["name"] = "jack" config["jack"]["password"] = l.hexdigest() config["alex"]["quato"] = "2.5" with open("config.ini", "w")as f: config.write(f)
coremain
import socket import os import json import hashlib import configparser import struct import subprocess from conf import settings class MYServer(object): """ ftp服务端 """ address_family = socket.AF_INET socket_type = socket.SOCK_STREAM STATUS = { "100": "用户验证成功!", "110": "用户名或密码错误!", "200": "文件存在", "210": "文件不存在", "300": "存储空间足够", "310": "存储空间不足", "400": "路径存在", "410": "路径不存在", "500": "文件续传", "999": "文件传输成功", "000": "文件传输失败" } def __init__(self, server_address): self.server_address = server_address self.socket = socket.socket(self.address_family, self.socket_type) self.server_bind() self.server_listen() self.user_current_dir = "" self.file_size = 0 def server_bind(self): """ 绑定 :return: """ if settings.ALLOW_REUSE_ADDRESS: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(self.server_address) def server_listen(self): """ 监听 :return: """ self.socket.listen(settings.REQUEST_QUEUEST_SIZE) def run(self): """ 建立连接,启动socket server :return: """ while True: self.conn, self.client_addr = self.socket.accept() try: self.manage() except ConnectionRefusedError: print("客户端发生错误,断开连接") self.socket.close() def manage(self): """ 处理与用户的所有指令交互 :return: """ while True: data = self.conn.recv(settings.MAX_PACKET_SIZE) # 接收客户端指令 if not data: print("连接断开... ") del self.conn, self.client_addr break cmd_data = json.loads(data.decode(settings.CODING)) action_type = cmd_data["action_type"] if action_type: if hasattr(self, action_type): func = getattr(self, action_type) func(cmd_data) else: print("未接收到有效指令") def header(self, status, **kwargs): """ 制作、发送报头 :param status: 状态码 :param kwargs: :return: """ header_dic = kwargs header_dic["status"] = status header_dic["status_msg"] = self.STATUS[status] header_dic["home_path"] = settings.HOME_PATH # print(header_dic) header_json = json.dumps(header_dic) header_bytes = header_json.encode(settings.CODING) self.conn.send(struct.pack("i", len(header_bytes))) # header_dic的大小传送给客户端 self.conn.send(header_bytes) # header_dic数据传送给客户端 def recv_header(self): """ 接收报头 :return: """ header = self.conn.recv(4) # 接收报头 header_size = struct.unpack("i", header)[0] header_bytes = self.conn.recv(header_size) # 接收报头信息 header_json = header_bytes.decode(settings.CODING) header_dic = json.loads(header_json) return header_dic def read_info(self): """ 加载所有账户信息 :return: """ conf = configparser.ConfigParser() conf.read(r"%s/%s" % (settings.CONF_PATH, "config.ini")) return conf def login(self, data): """ 用户登陆验证 :param data: 指令、用户名、密码 :return: """ username = data["username"] password = data["password"] conf = self.read_info() psd = self.hash_md5(password.encode(settings.CODING)) if username in conf: if psd == conf[username]["password"]: print("认证成功") self.header("100") self.user_current_dir = os.path.join(settings.HOME_PATH, username) return True else: self.header("110") print("认证失败") return False else: self.header("110") print("认证失败") return False def read_file(self, path): """ 打开文件 :param path: 文件路径 :return: """ with open(path, "rb") as f: return f.read() def hash_md5(self, msg): """ md5加密 :param msg: 加密信息 :return: """ m = hashlib.md5() m.update(msg) # print(m.hexdigest()) return m.hexdigest() def get(self, data): """ 下载 :param data: 指令、文件名,用户名、服务器路径 :return: """ if data["s_file_path"] is None: file_path = os.path.join(os.path.join(settings.HOME_PATH, data["username"]), data["filename"]) else: file_path = os.path.join(data["s_file_path"], data["filename"]) if os.path.isfile(file_path): status = "200" self.file_size = os.path.getsize(file_path) header_dic = { "filename": data["filename"], "md5": self.hash_md5(self.read_file(file_path)), "file_size": self.file_size, "s_file_path": file_path } self.header(status, **header_dic) else: status = "210" self.header(status) if self.conn.recv(3).decode(settings.CODING) == "200": if self.conn.recv(4).decode(settings.CODING) == "999": send_size = 0 with open(file_path, "rb")as f: for line in f: self.conn.send(line) send_size += len(line) self.progress_bar(send_size, self.file_size) f.close() res = self.conn.recv(4).decode(settings.CODING) if res == "999": print("下载成功!") else: print("下载失败!") else: return else: print(self.STATUS["210"]) def progress_bar(self, recv_size, file_size): """进度条 :param recv_size: 已接收大小 :param file_size: 总共大小 :return: """ rate = recv_size / file_size rate_num = int(rate * 100) number = int(50 * rate) r = ' [%s%s]%d%%' % ("#" * number, " " * (50 - number), rate_num,) print(" {}".format(r), end=" ") def put(self, data): """ 上传 :param data: 指令、文件名,用户名 :return: """ self.conn.send("999".encode(settings.CODING)) header_dic = self.recv_header() if header_dic["s_file_path"] is None: file_path = os.path.join(os.path.join(settings.HOME_PATH, data["username"]), data["filename"]) else: file_path = os.path.join(header_dic["s_file_path"], data["filename"]) quato = float(self.read_info()[data["username"]]["quato"]) * 1024 * 1024 * 1024 full_size = 0 for parent, dirs, files in os.walk(file_path): for file in files: fullname = os.path.join(parent, file) filesize = os.path.getsize(fullname) full_size += filesize header_dic.pop("status") header_dic["file_path"] = file_path if full_size + header_dic["file_size"] <= quato: status = "300" else: status = "310" if os.path.isfile(file_path): print("%s文件已存在" % data["filename"]) self.conn.send("000".encode(settings.CODING)) else: self.conn.send("999".encode(settings.CODING)) if self.conn.recv(3).decode(settings.CODING) == "999": self.header(status, **header_dic) if status == "300": recv_size = 0 with open(file_path, "wb")as f: # 接受真实的数据 while recv_size < header_dic["file_size"]: line = self.conn.recv(settings.MAX_PACKET_SIZE) f.write(line) recv_size += len(line) self.progress_bar(recv_size, header_dic["file_size"]) f.close() num = self.hash_md5(self.read_file(file_path)) # print(num) if num == header_dic["md5"]: self.conn.send("999".encode(settings.CODING)) print("上传成功") else: self.conn.send("000".encode(settings.CODING)) print("文件上传失败") else: print(self.STATUS[status]) else: return def dir(self, data): """ 查看当前目录 :param data: 指令、用户名 :return: """ cmd_obj = subprocess.Popen("dir %s" % self.user_current_dir, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout = cmd_obj.stdout.read() stderr = cmd_obj.stderr.read() result = stdout + stderr if not result: result = "当前目录下没有任何文件".encode("gbk") status = "200" msg = {"msg": result.decode("gbk")} self.header(status, **msg) def cd(self, data): """ 改变目录 :param data: 指令,目标路径 :return: """ current_dir = os.path.abspath(os.path.join(self.user_current_dir, data["target"])) print(current_dir) if os.path.isdir(current_dir): if current_dir.startswith(settings.HOME_PATH): status = "400" current_path = current_dir.replace(settings.HOME_PATH, "") self.user_current_dir = current_dir msg = {"path": current_dir, "current_path": current_path} self.header(status, **msg) else: status = "410" self.header(status) else: status = "410" self.header(status) def resume(self, data): """ 断点续传 :param data: 指令、文件名,用户名 :return: """ self.conn.send("999".encode(settings.CODING)) header_dict = self.recv_header() incomplete_size = header_dict["incomplete_size"] complete_size = os.path.getsize(header_dict["s_file_path"]) balance = complete_size - incomplete_size header_dict["balance"] = balance header_dict["md5"] = self.hash_md5(self.read_file(header_dict["s_file_path"])) status = "500" header_dict.pop("status") self.header(status, **header_dict) with open(header_dict["s_file_path"], "rb")as f: f.seek(incomplete_size) for line in f: self.conn.send(line) incomplete_size += len(line) self.progress_bar(incomplete_size, complete_size) f.close() res = self.conn.recv(4).decode(settings.CODING) if res == "999": print("下载成功!") else: print("下载失败!") # server = MYServer(('127.0.0.1', 8080)) # server.run()
README
服务端入口:bin-->server.py 客户端入口:client.py 客户端默认下载到download中 客户端从upload文件夹中上传文件 用户信息存在conf-->config.ini中