zoukankan      html  css  js  c++  java
  • SELECTORS模块实现并发简单版FTP

    环境:windows, python 3.5
    功能:
    使用SELECTORS模块实现并发简单版FTP
    允许多用户并发上传下载文件

    结构:
    ftp_client ---|
    bin ---|
    start_client.py ......启动客户端
    conf---|
    config.py ......客户端参数配置
    system.ini ......客户端参数配置文件
    core---|
    clients.py ......客户端主程序
    home ......默认下载路径
    ftp_server ---|
    bin ---|
    start_server.py ......启动服务端
    conf---|
    config.py ......服务端参数配置
    system.ini ......服务端参数配置文件
    core---|
    servers.py ......服务端主程序
    db ---|
    data.py ......存取用户数据
    home ......默认上传路径

    功能实现:
    客户端输入命令,根据命令通过映射进入相应方法,上传或者下载;
    服务端用SELECTORS模块实现并发,接收数据,通过action用映射进入相应方法;


    如何使用:
    启动start_server.py,启动start_client.py;
    在客户端输入get pathname/put pathname进行上传下载文件,开启多个客户端可并发上传下载。

    core:
    #!/usr/bin/env python
    # -*-coding:utf-8-*-
    # Author:zh
    import socket
    import os
    import json
    import random
    import conf
    PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))+os.sep+"home"+os.sep   # 默认下载存放路径为当前路径的home目录下
    
    
    class FtpClient(object):
        def __init__(self):
            self.client = socket.socket()
    
        def connect(self, ip, port):
            self.client.connect((ip, port))
    
        def interactive(self):
            # 入口
            while True:
                cmd = input(">>").strip()
                if len(cmd) == 0:
                    continue
                cmd_list = cmd.split()
                if len(cmd_list) == 2:
                    cmd_str = cmd_list[0]
                    if hasattr(self, cmd_str):
                        func = getattr(self, cmd_str)
                        func(cmd)
                    else:
                        print("输入格式错误")
                        self.help()
                else:
                    print("输入格式错误")
                    self.help()
    
        @staticmethod
        def help():
            msg = '''
    get filepath(下载文件)
    put filepath(上传文件)
            '''
            print(msg)
    
        def get(self, *args):
            # 下载
            data = args[0].split()
            action = data[0]
            path = data[1]
            send_msg = {
                "action": action,  # 第一次请求,需要服务端返回文件信息
                "file_path": path  # F:oracle课程(必读)11gOCP考试流程与须知.rar
            }
            self.client.send(json.dumps(send_msg).encode())
            data = self.client.recv(1024)
            data = json.loads(data.decode())
            if data["sign"] == "0":
                file_length = data["length"]
                file_path = PATH+data["filename"]
                if os.path.isfile(file_path):
                    file_path = file_path + str(random.randint(1,1000))
                file = open(file_path, "wb")
                send_msg_2 = {
                    "action": "get_file_data",  # 第二次请求,请求客户端给发送文件数据
                    "file_path": path  # 第二次请求依然需要传入文件路径,也可以在服务端利用全局变量保存
                }
                self.client.send(json.dumps(send_msg_2).encode())
                get_length = 0
                while get_length < int(file_length):
                    data = self.client.recv(1024)
                    file.write(data)
                    get_length += len(data)
                else:
                    file.close()
                if get_length == int(file_length):
                    print("下载成功")
                else:
                    print("文件传输失败")
                    os.remove(file_path)  # 如果传输过程出现问题,删除文件
            else:
                print("文件不存在")
    
        def put(self, *args):
            # 上传
            data = args[0].split()
            action = data[0]
            file_path = data[1]
            if os.path.isfile(file_path):
                msg = {
                    "action": action,  # 上传第一次发送,发送文件大小和姓名
                    "length": os.stat(file_path).st_size,
                    "filename": file_path[file_path.rfind("\") + 1:]
                }
                self.client.send(json.dumps(msg).encode())
                self.client.recv(1024)  # 避免黏包
                file = open(file_path, "rb")
                for line in file:
                    self.client.send(line)  # 上传第二次发送,发送文件数据,没有action,在服务端通过try,json报错的进入接收数据的方法里
                else:
                    file.close()
                sign = self.client.recv(1024)
                if sign == b'0':
                    print("上传成功")
                else:
                    print("上传失败")
            else:
                print("文件不存在")
    
    
    def run():
        data = conf.config.Configuration()
        conf_data = data.get_config()
        obj = FtpClient()
        obj.connect(conf_data[0][1], int(conf_data[1][1]))
        obj.interactive()
    clients

    core:

    #!/usr/bin/env python
    # -*-coding:utf-8-*-
    # Author:zh
    import selectors
    import socket
    import json
    import os
    import random
    import conf
    PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))+os.sep+"home"+os.sep  # 默认上传存放路径为当前路径的home目录下
    
    
    class SelectorFtp(object):
    
        def __init__(self):
            self.sel = selectors.DefaultSelector()
    
        def connect(self, ip, port):
            sock = socket.socket()
            sock.bind((ip, port))
            sock.listen(100)
            sock.setblocking(False)
            self.sel.register(sock, selectors.EVENT_READ, self.accept)
            while True:
                events = self.sel.select()
                for key, mask in events:
                    callback = key.data
                    callback(key.fileobj, mask)
    
        def accept(self, sock, mask):
            conn, addr = sock.accept()  # Should be ready
            conn.setblocking(False)
            self.sel.register(conn, selectors.EVENT_READ, self.interactive)
    
        def interactive(self, conn, mask):
            # 回调函数,每次接收消息都判断action,根据action分别调用不同函数信息交互
            try:
                data = conn.recv(1024)
                if data:
                    try:
                        cmd_dict = json.loads(data.decode())
                        action = cmd_dict['action']
                        if hasattr(self, action):
                            func = getattr(self, action)
                            func(cmd_dict, conn)
                    except (UnicodeDecodeError, json.decoder.JSONDecodeError, TypeError) as e:
                        # put发送文件数据,无法通过json打包,采取json报错后进入方法循环接收
                        self.put_file_data(data, conn)
                else:
                    self.sel.unregister(conn)
                    conn.close()
            except ConnectionResetError as e:
                print(e)
                # 客户端意外断开时注销链接
                self.sel.unregister(conn)
                conn.close()
    
        def get(self, *args):
            # 下载第一步:判断文件是否存在并返回文件大小和文件名
            conn = args[1]
            file_path = args[0]["file_path"]
            if os.path.isfile(file_path):
                sign = "0"
                length = os.stat(file_path).st_size
            else:
                sign = "1"
                length = None
            msg = {
                "sign": sign,
                "length": length,
                "filename": file_path[file_path.rfind("\")+1:]
            }
            conn.send(json.dumps(msg).encode())
    
        def get_file_data(self, *args):
            # 下载第二步,传送文件数据
            conn = args[1]
            file_path = args[0]["file_path"]
            file = open(file_path, "rb")
            for line in file:
                conn.send(line)
            else:
                file.close()
    
        def put(self, *args):
            # 上传第一步,接收文件大小和文件名
            conn = args[1]
            self.length = args[0]["length"]
            self.filename = PATH + args[0]["filename"]
            if os.path.isfile(self.filename):
                self.filename = self.filename + str(random.randint(1, 1000))
            self.file = open(self.filename, "wb")
            self.recv_size = 0
            conn.send(b"0")  # 避免客户端黏包
    
        def put_file_data(self, *args):
            # json报错后进入此循环接收数据
            conn = args[1]
            self.recv_size += len(args[0])
            self.file.write(args[0])
            if int(self.length) == self.recv_size:
                self.file.close()
                conn.send(b"0")
    
    
    def run():
        data = conf.config.Configuration()
        conf_data = data.get_config()
        obj = SelectorFtp()
        obj.connect(conf_data[0][1], int(conf_data[1][1]))
    servers

    config:

    #!/usr/bin/env python
    # -*-coding:utf-8-*-
    # _author_=zh
    import os
    import configparser
    PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    
    
    class Configuration(object):
        def __init__(self):
            self.config = configparser.ConfigParser()
            self.name = PATH+os.sep+"conf"+os.sep+"system.ini"
    
        def init_config(self):
            # 初始化配置文件,ip :客户端IP,port:客户端端口
            if not os.path.exists(self.name):
                self.config["config"] = {"ip": "localhost", "port": 1234}
                self.config.write(open(self.name, "w", encoding="utf-8", ))
    
        def get_config(self, head="config"):
            '''
            获取配置文件数据
            :param head: 配置文件的section,默认取初始化文件config的数据
            :return:返回head中的所有数据(列表)
            '''
            self.init_config()  # 取文件数据之前生成配置文件
            self.config.read(self.name, encoding="utf-8")
            if self.config.has_section(head):
                section = self.config.sections()
                return self.config.items(section[0])
    config





  • 相关阅读:
    python 检测mobileprovision证书的过期时间
    dynamodb 分区键排序键介绍
    dynamodb 基本操作
    Python 实现一个栈
    openstack阅读链接
    mongoengine文档
    机器学习链接
    mongoengine的使用
    Timer(让函数定时执行)
    线程,进程,IO多路复用,协程的代码
  • 原文地址:https://www.cnblogs.com/zh-20170913/p/8298166.html
Copyright © 2011-2022 走看看