SELECT版FTP:
使用SELECT或SELECTORS模块实现并发简单版FTP
允许多用户并发上传下载文件
思路解析:
1. 使用IO多路复用的知识使用SELECTORS封装好的SELECTORS模块编写程序
2. 是用IO多路复用的SELECT编写程序
3 .最后编写多并发程序模拟用户并发上传下载文件,在并发的时候为避免重复写,使用random随机生成新文件名
程序核心代码
README
作者:yaobin
版本: Selectors Ftp 示例版本 v0.1
开发环境: python3.6
程序介绍
1. 使用SELECT或SELECTORS模块实现并发简单版FTP
2. 允许多用户并发上传下载文件
文件目录结构
├─bin
│ __init__.py
│ client.py #客户端主程序
│ server.py #服务端主程序
│
├─conf
│ setting.py #配置文件
│ __init__.py
│
│
├─core
│ │ client_main.py #客户端交互程序
│ │ selectors_client.py #selectors客户端主程序
│ │ selectors_server.py #selectors服务端主程序
│ │ select_client.py #select客户端主程序
│ │ select_server.py #select服务端主程序
│ │ server_main.py #server端主程序
│ │ __init__.py
│ │
│ └─__pycache__ #pyc文件目录
│ client_main.cpython-36.pyc
│
├─db
│ │ __init__.py
│ │
│ ├─Client_DownLoad
│ ├─Server_DownLoad
│ ├─Server_Upload
│ └─test
│ test.log
│ test.py
│ __init__.py
│
└─logs
__init__.py
conf
setting.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
'''主配置文件'''
import os
import sys
import platform
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
if platform == 'Windows':#添加上传下载目录变量
download_path = (BASE_DIR+'\'+"db"+"Server_DownLoad")
upload_path = (BASE_DIR+'\'+"db"+"\Server_Upload")
client_download_path = (BASE_DIR+'\'+"db"+"Client_DownLoad")
else:
download_path = (BASE_DIR+'/'+"/db"+"/Server_DownLoad")
upload_path = (BASE_DIR+'/'+"/db"+"/Server_Upload")
client_download_path = (BASE_DIR + '/' + "/db" + "/Client_DownLoad")
core
client_main.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
'''客户端交互程序'''
import os
import sys
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from core import selectors_client
from core import select_client
class client_ftp(object):
'''client_ftp交互类'''
def start(self):
'''
启动函数
:return:
'''
print('欢迎进入Select Ftp')
msg = '''
1.selectors模块客户端上传下载测试
2.select客户端上传下载测试
3.exit
'''
while True:
print(msg)
user_choice = input('请选择操作>>>:')
if user_choice == '1':
client = selectors_client.selectors_client()
client.connect("localhost", 10000)
client.start()
elif user_choice == '2':
client = select_client.select_client()
client.connect("localhost", 10000)
client.start()
elif user_choice == '3' or user_choice == 'q' or user_choice == 'exit':
sys.exit('程序退出')
else:
print('非法操作,请重新输入')
selectors_client.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
'''selectclient交互程序'''
import sys
import os
import time
import platform
import random
import socket
import json
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from conf import setting
class select_client(object):
"""FTP 客户端"""
def __init__(self):
'''
构造函数
:return:
'''
self.client = socket.socket()
def start(self):
'''
启动函数
:return:
'''
print('login time : %s' % (time.strftime("%Y-%m-%d %X", time.localtime())))
while True:
try:
self.sending_msg_list = []
self.sending_msg = input('[root@select_ftp_client]# ')
self.sending_msg_list = self.sending_msg.split()
self.action = self.sending_msg_list[0]
if len(self.sending_msg_list) == 0:
continue
elif len(self.sending_msg_list) == 1:
if self.sending_msg_list[0] == "exit":
print('logout')
break
else:
print(time.strftime("%Y-%m-%d %X", time.localtime()),
'-bash : %s command not found' % self.sending_msg_list[0])
else:
try:
if platform.system() == 'Windows':
self.file_path = self.sending_msg_list[1]
self.file_list = self.sending_msg_list[1].strip().split('\')
self.file_name = self.file_list[-1]
elif platform.system() == 'Linux':
self.file_path = self.sending_msg_list[1]
self.file_list = self.sending_msg_list[1].strip().split('/')
self.file_name = self.file_list[-1]
except IndexError:
pass
if self.action == "put":
self.put(self.action,self.file_name)
elif self.action == "get":
self.get(self.action,self.file_name)
else:
print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]client:-bash: %s:'
% self.sending_msg_list[0], 'command not found')
except ConnectionResetError and ConnectionRefusedError and OSError and IndexError as e:
print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]client: -bash :', e, 'Restart client')
def put(self, action,file_name):
'''
客户端上传函数
:param cmd: 上传命令
:return:
'''
if os.path.exists(self.file_path) and os.path.isfile(self.file_path):
cmd = self.action + " " + self.file_name
self.client.send(cmd.encode())
self.client.recv(1024).decode()
trans_size = 0
file_size = os.stat(self.file_path).st_size
if file_size == 0 :
print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]client:-bash: %s:'
% self.file_name, 'file not allow null')
else:
n = 0
with open(self.file_path, 'rb') as f:
for line in f:
self.client.send(line)
trans_size += len(line)
else:
time.sleep(0.5)
print("
文件上传完成。 文件大小:[%s]字节" %trans_size)
self.client.send(b'put done(status:200)')
else :
print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]client:-bash: %s:'
% self.file_name, 'file not found')
def get(self,action,file_name):
'''
客户端下载函数
:param cmd: 下载命令
:return:
'''
cmd = self.action + " " + self.file_name
os.chdir(setting.client_download_path)#切换到客户端下载目录
self.client.send(cmd.encode())
data = self.client.recv(1024)
file_msg = json.loads(data.decode())
file_status = file_msg['status']
file_name = file_msg['filename']
if file_status == 550:
print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]client:-bash: %s:'
% self.file_name, 'file not found')
elif file_status == 200:
receive_size = 0
file_size = file_msg['size']
new = random.randint(1, 100000)
n = 0
with open(file_name+ '.'+ (str(new)), 'wb') as file_object:
while receive_size < file_size:
data = self.client.recv(1024)
file_object.write(data)
receive_size += len(data)
file_object.flush()
else:
file_object.close()
print(time.strftime("%Y-%m-%d %X", time.localtime()),
"[+]client: -bash :File get done File size is :", file_size)
def connect(self,ip,port):
'''
connect ip,port
:param ip:IP地址
:param port:端口
:return:
'''
self.client.connect((ip, port))
selectors_server.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
import os
import json
import sys
import random
import time
import select
import socket
import queue
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from conf import setting
class select_ftp(object):
"""Ftp server"""
def __init__(self, ip, port):
'''
构造函数
:param ip: 监听IP
:param port: 监听端口
:return:
'''
self.server = socket.socket()
self.host = ip
self.port = port
self.msg_dic = {}
self.inputs = [self.server,]
self.outputs = []
self.file_flag = {}
self.file_up_flag = {}
def start(self):
'''
主启动函数
:return:
'''
self.server.bind((self.host,self.port))
self.server.listen(1000)
self.server.setblocking(False)
while True:
readable, writeable, exceptional = select.select(self.inputs, self.outputs, self.inputs) # 定义检测
for r in readable:
self.readable(r)
for w in writeable:
self.writeable(w)
for e in exceptional:
self.clean(e)
def readable(self, ser):
'''
处理活动的从客户端传来的数据连接
:param ser: socket server自己
:return:
'''
if ser is self.server:
conn, addr = self.server.accept()
print(time.strftime("%Y-%m-%d %X", time.localtime()), ': echoing: newlink ',addr)
self.inputs.append(conn)
self.msg_dic[conn] = queue.Queue()
else:
try :
data = ser.recv(1024)
cmd = data.decode()
cmd_str = cmd.split()[0]
if len(cmd.split()) == 2 and hasattr(self, cmd_str):
print(time.strftime("%Y-%m-%d %X", time.localtime()), ': echoing: newlink ', cmd)
filename = cmd.split()[1]
func = getattr(self, cmd_str)
func(ser, filename)
else:
self.upload(ser, data)
except ConnectionResetError as e:
print(time.strftime("%Y-%m-%d %X", time.localtime()), ": client lost",ser)
self.clean(ser)
except UnicodeDecodeError as e :
self.upload(ser, data)
def writeable(self, conn):
'''
处理活动的传回客户端的数据连接
:param conn: 客户端连接
:return:
'''
try :
data_to_client = self.msg_dic[conn].get()
conn.send(data_to_client)
except Exception as e :
print(time.strftime("%Y-%m-%d %X", time.localtime()), ': error client lost')
self.clean(conn)
del self.file_flag[conn]
else:
self.outputs.remove(conn)
filename = self.file_flag[conn][2]
size = self.file_flag[conn][0]
trans_size = self.file_flag[conn][1]
if trans_size < size :
self.load(conn, filename, size)
else:
del self.file_flag[conn]
def clean(self, conn):
'''
连接完成,收尾处理
:param conn: 客户端连接
:return:
'''
if conn in self.outputs:
self.outputs.remove(conn)
if conn in self.inputs:
self.inputs.remove(conn)
if conn in self.msg_dic:
del self.msg_dic[conn]
def put(self, conn, filename):
'''
客户端上传函数
:param conn:
:param filename:
:return:
'''
os.chdir(setting.upload_path)
if filename == "done(status:200)":
del self.file_up_flag[conn]
else :
if os.path.isfile(filename):
try:
new = random.randint(1, 100000)
self.rename(filename, (filename + '.' + str(new)))
except FileExistsError:
os.remove(filename)
print(time.strftime("%Y-%m-%d %X", time.localtime()), ': server recv download data')
conn.send(b'200')
self.file_up_flag[conn] = filename
def upload(self, conn, data):
'''
客户端上传,数据接收函数
:param conn: 客户端连接
:param data: 客户端上传数据
:return:
'''
os.chdir(setting.upload_path)
if conn in self.file_up_flag:
filename = self.file_up_flag[conn]
with open(filename, 'ab') as file_object:
file_object.write(data)
def get(self, conn, filename):
'''
客户端下载函数
:param conn:
:param filename:
:return:
'''
os.chdir(setting.download_path)
msg_dic = { # 下载文件信息
"action" : "get",
"filename" : filename,
"size" : None,
"status" : 550
}
if os.path.isfile(filename):
size = os.stat(filename).st_size
msg_dic['size'] = size
msg_dic['status'] = 200
conn.send(json.dumps(msg_dic).encode())
if msg_dic['status'] == 200:
self.load(conn, filename, size)
def load(self, conn, filename, size):
'''
客户端下载,数据传输函数
:param conn:
:param filename:
:param size:
:return:
'''
if conn in self.file_flag:
trans_size = self.file_flag[conn][1]
else:
trans_size = 0
with open(filename, "rb") as f:
f.seek(trans_size)
data = f.readline()
self.msg_dic[conn].put(data)
self.outputs.append(conn)
trans_size += len(data)
self.file_flag[conn] = [size, trans_size, filename]
def rename(self, old_name, new_name):
'''
重命名函数
:param old_name:
:param new_name:
:return:
'''
if os.path.exists(new_name):
os.remove(new_name)
os.rename(old_name, new_name)
select_client.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
import os
import sys
import json
import time
import random
import socket
import platform
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from conf import setting
class selectors_client(object):
"""FTP 客户端"""
def __init__(self):
'''
构造函数
:param
:return
'''
self.client = socket.socket()
def start(self):
'''
启动函数
:param:
:return:
'''
print('login time : %s' % (time.strftime("%Y-%m-%d %X", time.localtime())))
while True:
try:
self.sending_msg_list = []
self.sending_msg = input('[root@select_ftp_client]# ')
self.sending_msg_list = self.sending_msg.split()
self.action = self.sending_msg_list[0]
if len(self.sending_msg_list) == 0:
continue
elif len(self.sending_msg_list) == 1:
if self.sending_msg_list[0] == "exit":
print('logout')
break
else:
print(time.strftime("%Y-%m-%d %X", time.localtime()),
'-bash : %s command not found' % self.sending_msg_list[0])
else:
try:
if platform.system() == 'Windows':
self.file_path = self.sending_msg_list[1]
self.file_list = self.sending_msg_list[1].strip().split('\')
self.file_name = self.file_list[-1]
elif platform.system() == 'Linux':
self.file_path = self.sending_msg_list[1]
self.file_list = self.sending_msg_list[1].strip().split('/')
self.file_name = self.file_list[-1]
except IndexError:
pass
if self.action == "put":
self.put()
elif self.action == "get":
self.get()
else:
print(time.strftime("%Y-%m-%d %X", time.localtime()),'[+]client:-bash: %s:'
%self.sending_msg_list[0], 'command not found')
except ConnectionResetError and ConnectionRefusedError and OSError and IndexError as e:
print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]client: -bash :', e,'Restart client')
selectors_client().start()
def put(self):
'''
上传函数
:param:cmd:上传命令
:return:
'''
if os.path.exists(self.file_path) and os.path.isfile(self.file_path):
self.file_size = os.path.getsize(self.file_path)
data_header = {"client": {
"action": "put",
"file_name": self.file_name,
"size": self.file_size}}
self.client.send(json.dumps(data_header).encode())
print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]server: -bash : %s '
% self.client.recv(1024).decode())
with open(self.file_path, 'rb') as file_object:
for line in file_object:
self.client.send(line)
file_object.close()
print(self.client.recv(1024).decode())
else:
print(time.strftime("%Y-%m-%d %X", time.localtime()),'[+]client: -bash :%s : No such file'
%self.file_name)
def get(self):
'''
下载函数
:param:cmd 下载命令
:return:
'''
os.chdir(setting.client_download_path)
data_header = {"client": {
"action": "get",
"file_name": self.file_name,
"size": 0}}
self.client.send(json.dumps(data_header).encode())
self.data = self.client.recv(1024)
if self.data.decode() == '404':
print(time.strftime("%Y-%m-%d %X", time.localtime()),
'[+]server: -bash : %s : No such file' % (self.file_path))
else:
print(time.strftime("%Y-%m-%d %X", time.localtime()),
"[+]server: -bash : File ready to get File size is :", self.data.decode())
new = random.randint(1, 100000)
file_object = open((self.file_name + '.' + (str(new))), 'wb')
received_size = 0
file_size = int(self.data.decode())
while received_size < file_size:
if file_size - received_size > 1024:
size = 1024
elif file_size < 1024:
size = file_size
else:
size = file_size - received_size
recv_data = self.client.recv(size)
received_size += len(recv_data)
file_object.write(recv_data)
else:
file_object.flush()
file_object.close()
time.sleep(0.1)
print(time.strftime("%Y-%m-%d %X", time.localtime()),
"[+]client: -bash :File get done File size is :", file_size)
def connect(self, ip, port):
'''
链接函数
:param ip:
:param port:
:return:
'''
self.client.connect((ip, port))
select_server.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
import os
import sys
import json
import selectors
import socket
import time
import errno
import random
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from conf import setting
sel = selectors.DefaultSelector()
class selectors_ftp(object):
'''selectors_ftp服务端'''
def __init__(self):
'''
构造函数
'''
self.sock = socket.socket()
def upload(self,conn,mask):
'''
服务器upload函数
:param conn:
:param mask
:return:
'''
os.chdir(setting.upload_path)
self.conn.send(b'Server receive upload %s request'%self.file_name.encode())
new = random.randint(1, 100000) #并发测试使用random生成新文件名
file_object = open((self.file_name+'.'+(str(new))), 'wb')
received_size = 0
while received_size < self.file_size:
try:
if self.file_size - received_size > 1024:
size = 1024
elif self.file_size < 1024:
size = self.file_size
else:
size = self.file_size - received_size
recv_data = conn.recv(size)
received_size += len(recv_data)
file_object.write(recv_data)
except BlockingIOError as e:
if e.errno != errno.EAGAIN:
raise
else:
time.sleep(0.00001)
# #print(received_size, file_size)
else:
file_object.close()
def download(self,conn,mask):
'''
服务器下载函数
:param conn:
:param mask:
:return:
'''
while True:
os.chdir(setting.download_path)
if os.path.isfile(self.file_name) and os.path.exists(self.file_name):
try:
file_size = os.path.getsize(self.file_name)
self.conn.send(str(file_size).encode())
client_file_size = 0
with open(self.file_name, "rb") as file_obj:
for line in file_obj:
client_file_size += len(line) # 记录已经传送的文件大小
self.conn.sendall(line)
file_obj.close()
if client_file_size >= int(file_size): # 文件传送完毕
break
except BlockingIOError as e:
if e.errno != errno.EAGAIN: # errno.EAGAIN 缓冲区满 等待下
raise
else:
time.sleep(0.00001) # 等待0.1s进行下一次读取
else:
conn.send(b'404')
break
def accept(self,sock,mask):
'''
服务器监听函数
:param sock:
:param mask:
:return:
'''
self.conn, self.addr = sock.accept()
print(time.strftime("%Y-%m-%d %X", time.localtime()), ': accepted',self.conn,'from', self.addr, mask)
self.conn.setblocking(False)
sel.register(self.conn, selectors.EVENT_READ, self.read)
def read(self,conn,mask):
'''
服务器读取命令信息函数
:param conn:
:param mask:
:return:
'''
self.data = conn.recv(1024)
if self.data:
self.data_receive = json.loads(self.data.decode())
self.action = self.data_receive['client']['action']
self.file_name = self.data_receive['client']['file_name']
self.file_size = self.data_receive['client']['size']
print(time.strftime("%Y-%m-%d %X", time.localtime()), ': echoing', repr(self.data), 'to', self.conn, mask)
if self.action == 'put':
self.upload(self.conn, mask)
conn.send(b'[+]server: -bash : Server receive upload %s done ' % self.file_name.encode())
print(time.strftime("%Y-%m-%d %X", time.localtime()), ': client :', self.addr,
': upload %s done' % self.file_name)
elif self.action == 'get':
self.download(self.conn, mask)
print(time.strftime("%Y-%m-%d %X", time.localtime()), ': client :', self.addr,
': download %s done' % self.file_name)
else:
print(time.strftime("%Y-%m-%d %X", time.localtime()), ': closing:', self.conn, mask)
sel.unregister(conn)
conn.close()
def register(self,sock):
'''
注册函数
:return:
'''
sel.register(self.sock, selectors.EVENT_READ, self.accept)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj,mask)
def start(self,ip,port):
'''
启动函数
:return:
'''
self.sock.bind((ip,port))
self.sock.listen(500)
self.sock.setblocking(False)
self.register(self.sock)
server_main.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Colin Yao
'''server端交互程序'''
import os
import sys
import time
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from core import selectors_server
from core import select_server
class server_ftp(object):
'''ftp_server交互程序'''
def start(self):
'''
启动函数
:return:
'''
print('欢迎进入Select Ftp')
msg = '''
1.selectors 服务端
2.select 客户端
3.exit 退 出
'''
while True:
print(msg)
user_choice = input('请选择操作>>>:')
if user_choice == '1':
server = selectors_server.selectors_ftp()
print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]selectors server ftp already work ')
server.start("localhost", 10000)
elif user_choice == '2':
server = select_server.select_ftp("localhost",10000)
print(time.strftime("%Y-%m-%d %X", time.localtime()), '[+]select server ftp already work ')
server.start()
elif user_choice == '3' or user_choice== 'q'or user_choice == 'exit':
sys.exit('程序退出')
else:
print('非法的操作,请重新输入')
程序测试样图
Windows 有没有类似ulimit的文件不太清楚默认是有限制链接的,Linux是可以修改的
Windows : Win10
500链接测试命令返回效果
Linux (VMware) Centos6.7
ulimit文件
1W链接测试命令返回效果
5W链接测试命令返回效果