zoukankan      html  css  js  c++  java
  • 并发网络编程

    1.网络编程

      OSI 7层模型

       TCP/IP模型

     通信地址

    IP地址相关命令

      ifconfig:查看Linux系统下计算机的IP地址

      ping[ip]:  查看计算机的连通性.

    UDP传输方法

        套接字(Socket):实现网络编程进行数据传输的一种技术手段,网络上各种各样的网络服务大多都是基于 Socket 来完成通信的。

        模块: import socker

      UDP套接字编程

    sockfd=socket.socket(socket_family,socket_type,proto=0)
    功能:创建套接字
    参数:socket_family  网络地址类型 AF_INET表示ipv4
         socket_type  套接字类型 SOCK_DGRAM 表示udp套接字 (也叫数据报套接字) 
         proto  通常为0  选择子协议
    返回值: 套接字对象

      绑定地址

    本地地址 : 'localhost' , '127.0.0.1'

    网络地址 : '172.40.91.185' (通过ifconfig查看)

    自动获取地址: '0.0.0.0'

    sockfd.bind(addr)
    功能: 绑定本机网络地址
    参数: 二元元组 (ip,port)  ('0.0.0.0',8888)

    消息收发

    data,addr = sockfd.recvfrom(buffersize)
    功能: 接收UDP消息
    参数: 每次最多接收多少字节
    返回值: data  接收到的内容
            addr  消息发送方地址
    
    n = sockfd.sendto(data,addr)
    功能: 发送UDP消息
    参数: data  发送的内容 bytes格式
          addr  目标地址
    返回值:发送的字节数

    关闭套接字

    sockfd.close()
    功能:关闭套接字

    TCP传输方法

    三次握手(建立连接)

      客户端向服务器发送消息报文请求连接

      服务器收到请求后,回复报文确定可以连接

      客户端收到回复,发送最终报文连接建立

     四次挥手(断开连接)

      主动方发送报文请求断开连接

      被动方收到请求后,立即回复,表示准备断开

      被动方准备就绪,再次发送报文表示可以断开

      主动方收到确定,发送最终报文完成断开

     TCP服务端

      1.创建套接字

    sockfd=socket.socket(socket_family,socket_type,proto=0)
    功能:创建套接字
    参数:socket_family  网络地址类型 AF_INET表示ipv4
         socket_type  套接字类型 SOCK_STREAM 表示tcp套接字 (也叫流式套接字) 
         proto  通常为0  选择子协议
    返回值: 套接字对象

      2.绑定地址

      3.设置监听

    sockfd.listen(n)
    功能 : 将套接字设置为监听套接字,确定监听队列大小
    参数 : 监听队列大小

      4.处理客户端连接请求

    connfd,addr = sockfd.accept()
    功能: 阻塞等待处理客户端请求
    返回值: connfd  客户端连接套接字
            addr  连接的客户端地址

      5.消息收发

    data = connfd.recv(buffersize)
    功能 : 接受客户端消息
    参数 :每次最多接收消息的大小
    返回值: 接收到的内容
    
    n = connfd.send(data)
    功能 : 发送消息
    参数 :要发送的内容  bytes格式
    返回值: 发送的字节数

      6.关闭套接字

    TCP客户端

    创建套接字

    请求连接

    sockfd.connect(server_addr)
    功能:连接服务器
    参数:元组  服务器地址

    收发消息

      注意: 防止两端都阻塞,recv send要配合

    关闭套接字

     TCP套接字细节

    1.tcp连接中当一端退出,另一端如果阻塞在recv,此时recv会立即返回一个空字串。

    2.tcp连接中如果一端已经不存在,仍然试图通过send向其发送数据则会产生BrokenPipeError

    3. 一个服务端可以同时连接多个客户端,也能够重复被连接

    4.tcp粘包问题

      产生原因

        1.为了解决数据再传输过程中可能产生的速度不协调问题,操作系统设置了缓冲区

        2.实际网络工作过程比较复杂,导致消息收发速度不一致

        3.tcp以字节流方式进行数据传输,在接收时不区分消息边界

      影响

        如果每次发送内容是一个独立的含义,需要接收端独立解析此时粘包会有影响。

      处理方法

        人为的添加消息边界,用作消息之间的分割

        控制发送的速度 

    多任务编程   

      进程(Process)

    状态:(三态)

      就绪态: 进程具备执行条件,等待系统调度分配cpu资源

      运行态 : 进程占有cpu正在运行

      等待态 : 进程阻塞等待,此时会让出cpu

       (五态)

      新建 : 创建一个进程,获取资源的过程

      终止 : 进程结束,释放资源的过程

    进程命令

      查看进程信息

    ps -aux
    • USER : 进程的创建者

    • PID : 操作系统分配给进程的编号,大于0的整数,系统中每个进程的PID都不重复。PID也是重要的区分进程的标志。

    • %CPU,%MEM : 占有的CPU和内存

    • STAT : 进程状态信息,S I 表示阻塞状态 ,R 表示就绪状态或者运行状态

    • START : 进程启动时间

    • COMMAND : 通过什么程序启动的进程

    进程树形结构

    pstree

    多进程编程

    使用模块: multiprocessing

    创建流程

    【1】 将需要新进程执行的事件封装为函数

    【2】 通过模块的Process类创建进程对象,关联函数

    【3】 可以通过进程对象设置进程信息及属性

    【4】 通过进程对象调用start启动进程

    【5】 通过进程对象调用join回收进程资源

    Process()
    功能 : 创建进程对象
    参数 : target 绑定要执行的目标函数 
           args 元组,用于给target函数位置传参
           kwargs 字典,给target函数键值传参
    p.start()
    功能 : 启动进程

    注意 : 启动进程此时target绑定函数开始执行,该函数作为新进程执行内容,此时进程真正被创建

    p.join([timeout])
    功能:阻塞等待回收进程
    参数:超时时间

    进程执行现象理解 (难点)

    • 新的进程是原有进程的子进程,子进程复制父进程全部内存空间代码段,一个进程可以创建多个子进程。

    • 子进程只执行指定的函数,其余内容均是父进程执行内容,但是子进程也拥有其他父进程资源。

    • 各个进程在执行上互不影响,也没有先后顺序关系。

    • 进程创建后,各个进程空间独立,相互没有影响。

    • multiprocessing 创建的子进程中无法使用标准输入(input)。

    进程对象属性

    • p.name 进程名称

    • p.pid 对应子进程的PID号

    • p.is_alive() 查看子进程是否在生命周期

    • p.daemon 设置父子进程的退出关系

      • 如果设置为True则该子进程会随父进程的退出而结束

      • 要求必须在start()前设置

      • 如果daemon设置成True 通常就不会使用 join()

    进程处理细节

    os.getpid()
    功能: 获取一个进程的PID值
    返回值: 返回当前进程的PID 
    os.getppid()
    功能: 获取父进程的PID号
    返回值: 返回父进程PID
    sys.exit(info)
    功能:退出进程
    参数:字符串 表示退出时打印内容

    孤儿和僵尸

    • 孤儿进程 : 父进程先于子进程退出,此时子进程成为孤儿进程。

      • 特点: 孤儿进程会被系统进程收养,此时系统进程就会成为孤儿进程新的父进程,孤儿进程退出该进程会自动处理。

    • 僵尸进程 : 子进程先于父进程退出,父进程又没有处理子进程的退出状态,此时子进程就会成为僵尸进程。

      • 特点: 僵尸进程虽然结束,但是会存留部分进程信息资源在内存中,大量的僵尸进程会浪费系统的内存资源。

      • 如何避免僵尸进程产生

        1. 使用join()回收

        2. 在父进程中使用signal方法处理

    from signal import *
    signal(SIGCHLD,SIG_IGN)

    创建进程类

    • 创建步骤

      【1】 继承Process类

      【2】 重写__init__方法添加自己的属性,使用super()加载父类属性

      【3】 重写run()方法

    • 使用方法

      【1】 实例化对象

      【2】 调用start自动执行run方法

      【3】 调用join回收进程

    进程池

    • 必要性

      【1】 进程的创建和销毁过程消耗的资源较多

      【2】 当任务量众多,每个任务在很短时间内完成时,需要频繁的创建和销毁进程。此时对计算机压力较大

      【3】 进程池技术很好的解决了以上问题。

    • 原理

      创建一定数量的进程来处理事件,事件处理完进 程不退出而是继续处理其他事件,直到所有事件全都处理完毕统一销毁。增加进程的重复利用,降低资源消耗。

    • 进程池实现

    1. 创建进程池对象,放入适当的进程

    from multiprocessing import Pool
    
    Pool(processes)
    功能: 创建进程池对象
    参数: 指定进程数量,默认根据系统自动判定

       2.将事件加入进程池队列执行

    pool.apply_async(func,args,kwds)
    功能: 使用进程池执行 func事件
    参数: func 事件函数
          args 元组  给func按位置传参
          kwds 字典  给func按照键值传参

      3.关闭进程池

    pool.close()
    功能: 关闭进程池

      4.回收进程池中进程

    pool.join()
    功能: 回收进程池中进程

    进程通信

    • 必要性: 进程间空间独立,资源不共享,此时在需要进程间数据传输时就需要特定的手段进行数据通信。

    • 常用进程间通信方法:消息队列,套接字等。

    • 消息队列使用

      • 通信原理: 在内存中开辟空间,建立队列模型,进程通过队列将消息存入,或者从队列取出完成进程间通信。

      • 实现方法

    from multiprocessing import Queue
    
    q = Queue(maxsize=0)
    功能: 创建队列对象
    参数:最多存放消息个数
    返回值:队列对象
    
    q.put(data,[block,timeout])
    功能:向队列存入消息
    参数:data  要存入的内容
        block  设置是否阻塞 False为非阻塞
        timeout  超时检测
    
    q.get([block,timeout])
    功能:从队列取出消息
    参数:block  设置是否阻塞 False为非阻塞
         timeout  超时检测
    返回值: 返回获取到的内容
    
    q.full()   判断队列是否为满
    q.empty()  判断队列是否为空
    q.qsize()  获取队列中消息个数
    q.close()  关闭队列

    群聊聊天室

    功能 : 类似qq群功能

    【1】 有人进入聊天室需要输入姓名,姓名不能重复

    【2】 有人进入聊天室时,其他人会收到通知:xxx 进入了聊天室

    【3】 一个人发消息,其他人会收到:xxx : xxxxxxxxxxx

    【4】 有人退出聊天室,则其他人也会收到通知:xxx退出了聊天室

    【5】 扩展功能:服务器可以向所有用户发送公告:管理员消息: xxxxxxxxx

    代码:

    """
    chat_server.py
    
    """
    
    from socket import *
    from multiprocessing import Process
    
    # 服务器地址
    HOST = "0.0.0.0"
    PORT = 8000
    ADDR = (HOST, PORT)
    
    # 存储用户信息  {name:address}
    user = {}
    
    
    # 处理进入聊天室
    def login(sock, name, addr):
        if name in user or "管理" in name:
            # 反馈结果
            sock.sendto(b"FAIL", addr)
        else:
            sock.sendto(b"OK", addr)
            # 循环通知其他人
            msg = "欢迎 %s 进入聊天室" % name
            for i in user:
                sock.sendto(msg.encode(), user[i])
            user[name] = addr  # 增加该用户
        # print(user)
    
    
    # 处理聊天
    def chat(sock, name, content):
        msg = "%s : %s" % (name, content)
        for i in user:
            # 除去本人
            if i != name:
                sock.sendto(msg.encode(), user[i])
    
    
    # 处理退出
    def exit(sock, name):
        del user[name]  # 删除用户
        msg = "%s 退出了聊天室" % name
        for i in user:
            sock.sendto(msg.encode(), user[i])
    
    
    # 处理客户端请求
    def request(sock):
        # 循环接收各种客户端请求 (总分模式)
        while True:
            # 接收所有客户端所有请求
            data, addr = sock.recvfrom(1024)
            # 对数据结构进行简单解析
            tmp = data.decode().split(' ', 2)
            if tmp[0] == "LOGIN":
                # tmp --> [LOGIN,name]
                login(sock, tmp[1], addr)
            elif tmp[0] == "CHAT":
                # tmp --> [CHAT,name,content]
                chat(sock, tmp[1], tmp[2])
            elif tmp[0] == "EXIT":
                # tmp--> [EXIT,name]
                exit(sock, tmp[1])
    
    
    # 程序启动函数
    def main():
        # UDP套接字
        sock = socket(AF_INET, SOCK_DGRAM)
        sock.bind(ADDR)
    
        # 创建子进程
        p = Process(target=request, args=(sock,))
        p.daemon = True
        p.start()
    
        # 发送管理员信息
        while True:
            content = input("管理员消息:")
            if content == 'exit':
                break
            msg = "CHAT 管理员消息 " + content
            sock.sendto(msg.encode(), ADDR)
    
    
    if __name__ == '__main__':
        main()
    """
    chat room 客户端代码
    """
    
    from socket import *
    from multiprocessing import Process
    import sys
    
    # 服务器地址
    ADDR = ('119.3.124.77', 8000)
    
    
    # 处理登录
    def login(sock):
        while True:
            # 进入聊天室
            name = input("Name:")
            # 发送姓名
            msg = "LOGIN " + name
            sock.sendto(msg.encode(), ADDR)
            # 接收结果
            result, addr = sock.recvfrom(128)
            if result.decode() == 'OK':
                print("进入聊天室")
                return name
            else:
                print("该用户已存在")
    
    
    # 接收消息
    def recv_msg(sock):
        while True:
            data, addr = sock.recvfrom(1024 * 10)
            msg = "
    %s
    发言:" % data.decode()
            print(msg, end="")  # 不换行
    
    
    # 发送消息
    def send_msg(sock, name):
        while True:
            try:
                content = input("发言:")
            except KeyboardInterrupt:
                content = "exit"
            # 输入exit表示要退出聊天室
            if content == "exit":
                msg = "EXIT " + name
                sock.sendto(msg.encode(), ADDR)
                sys.exit("您已退出聊天室")
    
            msg = "CHAT %s %s" % (name, content)
            # 给服务端发送聊天请求
            sock.sendto(msg.encode(), ADDR)
    
    
    
    # 网络连接
    def main():
        sock = socket(AF_INET, SOCK_DGRAM)
        sock.bind(("0.0.0.0",12345)) # 地址不变化
        name = login(sock)  # 进入聊天室
    
        # 创建子进程 用于接收消息
        p = Process(target=recv_msg, args=(sock,))
        p.daemon = True  # 父进程退出子进程也退出
        p.start()
        send_msg(sock, name)  # 父进程发送消息
    
    
    if __name__ == '__main__':
        main()

    线程(Thread)

       什么是线程
        线程被称为轻量级的进程,也是多任务编程方式

        也可以利用计算机的多cpu资源

        线程可以理解为进程中再开辟的分支任务

       线程特征

         一个进程中可以包含多个

         线程线程也是一个运行行为,消耗计算机资源

         一个进程中的所有线程共享这个进程的资源

         多个线程之间的运行同样互不影响各自运行

         线程的创建和销毁消耗资源远小于进程

    线程模块: threading

    创建方法:

      1.创建线程

    from threading import Thread 
    
    t = Thread()
    功能:创建线程对象
    参数:target 绑定线程函数
         args   元组 给线程函数位置传参
         kwargs 字典 给线程函数键值传参

      2.启动线程

     t.start()

      3.回收线程

     t.join([timeout])

    线程对象属性

    • t.setName() 设置线程名称

    • t.getName() 获取线程名称

    • t.is_alive() 查看线程是否在生命周期

    • t.setDaemon() 设置daemon属性值

    • t.isDaemon() 查看daemon属性值

      • daemon为True时主线程退出分支线程也退出。要在start前设置,通常不和join一起使用。

    创建线程类

    1. 创建步骤

      【1】 继承Thread类

      【2】 重写__init__方法添加自己的属性,使用super()加载父类属性

      【3】 重写run()方法

    2. 使用方法

      【1】 实例化对象

      【2】 调用start自动执行run方法

      【3】 调用join回收线程

    线程同步互斥

    • 线程通信方法: 线程间使用全局变量进行通信

    • 共享资源争夺

      • 共享资源:多个进程或者线程都可以操作的资源称为共享资源。对共享资源的操作代码段称为临界区。

      • 影响 : 对共享资源的无序操作可能会带来数据的混乱,或者操作错误。此时往往需要同步互斥机制协调操作顺序。

    • 同步互斥机制

      • 同步 : 同步是一种协作关系,为完成操作,多进程或者线程间形成一种协调,按照必要的步骤有序执行操作。

      • 互斥 : 互斥是一种制约关系,当一个进程或者线程占有资源时会进行加锁处理,此时其他进程线程就无法操作该资源,直到解锁后才能操作。

     线程Event

    from threading import Event
    
    e = Event()  创建线程event对象
    
    e.wait([timeout])  阻塞等待e被set
    
    e.set()  设置e,使wait结束阻塞
    
    e.clear() 使e回到未被设置状态
    
    e.is_set()  查看当前e是否被设置

    线程锁 Lock

    from  threading import Lock
    
    lock = Lock()  创建锁对象
    lock.acquire() 上锁  如果lock已经上锁再调用会阻塞
    lock.release() 解锁
    
    with  lock:  上锁
    ...
    ...
         with代码块结束自动解锁

    网络并发模型

      多进程并发模型

    • 创建网络套接字用于接收客户端请求

    • 等待客户端连接

    • 客户端连接,则创建新的进程具体处理客户端请求

    • 主进程继续等待其他客户端连接

    • 如果客户端退出,则销毁对应的进程

    """
        多进程并发模型
      process_server
    """
    from signal import *
    from socket import *
    from multiprocessing import Process
    
    # 服务器地址
    HOST = "0.0.0.0"
    PORT = 8888
    ADDR = (HOST, PORT)
    
    
    # 实现具体的业务功能,客户端请求都在这里处理
    def handle(connfd):
        # 与客户端配合测试
        while True:
            data = connfd.recv(1024)
            if not data:
                break
            print(data.decode())
            connfd.send(b"ok")
        connfd.close()
    
    
    # 函数中编写并发服务
    def main():
        sock = socket()  # tcp套接字
        sock.bind(ADDR)
        sock.listen(5)
    
        print("Listen the port %d" % PORT)
        signal(SIGCHLD, SIG_IGN)  # 处理僵尸进程
    
        while True:
            # 循环接收客户端连接
            try:
                connfd, addr = sock.accept()
                print("Connect from", addr)
            except KeyboardInterrupt:
                # 退出服务
                sock.close()
                break
            # 为连接的客户端创建新进程
            p = Process(target=handle, args=(connfd,))
            p.daemon = True  # 客户端随服务端退出
            p.start()
    
    
    if __name__ == '__main__':
        main()
    """
    tcp_client
    """
    
    from socket import *
    
    # 服务器地址
    server_addr = ("127.0.0.1",8888)
    
    tcp_socket = socket()
    tcp_socket.connect(server_addr)
    
    # 收发消息
    while True:
        msg = input(">>")
        if not msg:
            break
        tcp_socket.send(msg.encode())
        data = tcp_socket.recv(1024)
        print("从服务器收到:",data.decode())
    
    tcp_socket.close()

    多线程并发模型

    • 创建网络套接字用于接收客户端请求

    • 等待客户端连接

    • 客户端连接,则创建新的线程具体处理客户端请求

    • 主线程继续等待其他客户端连接

    • 如果客户端退出,则销毁对应的线程

    """
        多线程并发模型
    
        thread_server
    """
    from socket import *
    from threading import Thread
    
    # 服务器地址
    HOST = "0.0.0.0"
    PORT = 8888
    ADDR = (HOST, PORT)
    
    
    # 实现具体的业务功能,客户端请求都在这里处理
    class MyThread(Thread):
        def __init__(self, connfd):
            self.connfd = connfd
            super().__init__()
    
        def run(self):
            # 与客户端配合测试
            while True:
                data = self.connfd.recv(1024)
                if not data:
                    break
                print(data.decode())
                self.connfd.send(b"ok")
            self.connfd.close()
    
    
    # 函数中编写并发服务
    def main():
        sock = socket()  # tcp套接字
        sock.bind(ADDR)
        sock.listen(5)
    
        print("Listen the port %d" % PORT)
    
        while True:
            # 循环接收客户端连接
            try:
                connfd, addr = sock.accept()
                print("Connect from", addr)
            except KeyboardInterrupt:
                # 退出服务
                sock.close()
                break
            # 为连接的客户端创建新进程
            t = MyThread(connfd)
            t.setDaemon(True)  # 客户端随服务端退出
            t.start()
    
    
    if __name__ == '__main__':
        main()

    ftp 文件服务器

    【1】 分为服务端和客户端,要求可以有多个客户端同时操作。

    【2】 客户端可以查看服务器文件库中有什么文件。

    【3】 客户端可以从文件库中下载文件到本地。

    【4】 客户端可以上传一个本地文件到文件库。

    【5】 使用print在客户端打印命令输入提示,引导操作

    """
    ftp文件管理服务端
    多线程 tcp 并发
    """
    from socket import *
    from threading import Thread
    import os
    from time import sleep
    
    # 服务器地址
    HOST = "0.0.0.0"
    PORT = 8888
    ADDR = (HOST, PORT)
    
    # 文件库位置
    FTP = "/home/tarena/FTP/"
    
    
    # 实现具体的业务功能,客户端请求都在这里处理
    class FTPServer(Thread):
        def __init__(self, connfd):
            self.connfd = connfd
            super().__init__()
    
        # 处理请求文件列表
        def do_list(self):
            # 判断文件库是否为空
            files = os.listdir(FTP)
            if not files:
                self.connfd.send(b"FAIL")  # 失败
            else:
                self.connfd.send(b"OK")
                sleep(0.1)
                # 一次发送所有文件名
                data = "
    ".join(files)
                self.connfd.send(data.encode())
                sleep(0.1)
                self.connfd.send(b"##")
    
        # 处理上传
        def do_put(self, filename):
            # 判断文件是否已存在
            if os.path.exists(FTP + filename):
                self.connfd.send(b"FAIL")
                return
            else:
                self.connfd.send(b"OK")
                # 接收文件
                f = open(FTP + filename, 'wb')
                while True:
                    data = self.connfd.recv(1024)
                    if data == b"##":
                        break
                    f.write(data)
                f.close()
    
        # 处理下载
        def do_get(self, filename):
            try:
                f = open(FTP + filename, 'rb')
            except:
                # 文件不存在
                self.connfd.send(b"FAIL")
                return
            else:
                self.connfd.send(b"OK")
                sleep(0.1)
                # 发送文件
                while True:
                    data = f.read(1024)
                    if not data:
                        break
                    self.connfd.send(data)
                sleep(0.1)
                self.connfd.send(b"##")
                f.close()
    
        # 线程启动方法
        def run(self):
            while True:
                # 接收某一个各类请求
                data = self.connfd.recv(1024).decode()
                # print(data)
                if not data or data == "EXIT":
                    break
                elif data == "LIST":
                    self.do_list()
                elif data[:4] == "STOR":
                    filename = data.split(' ')[-1]
                    self.do_put(filename)
                elif data[:4] == "RETR":
                    filename = data.split(' ')[-1]
                    self.do_get(filename)
            self.connfd.close()
    
    
    # 函数中搭建并发结构
    def main():
        sock = socket()  # tcp套接字
        sock.bind(ADDR)
        sock.listen(5)
    
        print("Listen the port %d" % PORT)
    
        while True:
            # 循环接收客户端连接
            try:
                connfd, addr = sock.accept()
                print("Connect from", addr)
            except KeyboardInterrupt:
                # 退出服务
                sock.close()
                break
            # 使用自定义线程类为连接的客户端创建新线程
            t = FTPServer(connfd)
            t.setDaemon(True)  # 客户端随服务端退出
            t.start()
    
    
    if __name__ == '__main__':
        main()
    """
    ftp 文件服务 客户端
    c / s  连接   发请求   获取结果
    """
    from socket import *
    import sys
    from time import sleep
    
    # 服务端地址
    ADDR = ("127.0.0.1", 8888)
    
    
    # 发起请求的所有功能
    class FTPClient:
        def __init__(self, sock):
            self.sock = sock
    
        # 请求文件列表
        def do_list(self):
            self.sock.send(b"LIST")  # 发送请求
            result = self.sock.recv(128).decode()  # 等回复
            # 根据不同结果分情况处理
            if result == "OK":
                # 接收文件列表
                while True:
                    file = self.sock.recv(1024).decode()
                    if file == '##':
                        break
                    print(file)
            else:
                print("文件库为空")
    
        # 退出
        def do_exit(self):
            self.sock.send(b"EXIT")
            self.sock.close()
            sys.exit("谢谢使用")
    
        # 处理上传
        def do_put(self, file):
            # 测一下这个文件是否存在
            try:
                f = open(file, 'rb')
            except:
                print("文件不存在")
                return
            # 防止file带有文件路径,提取文件名
            filename = file.split("/")[-1]
            data = "STOR " + filename
            self.sock.send(data.encode())  # 发请求
            result = self.sock.recv(128).decode()  # 等待回复
            if result == 'OK':
                # 上传文件 读--》发送
                while True:
                    data = f.read(1024)
                    if not data:
                        break
                    self.sock.send(data)
                sleep(0.1)
                self.sock.send(b"##")
                f.close()
            else:
                print("该文件已存在")
    
        # 处理下载
        def do_get(self, file):
            data = "RETR " + file
            self.sock.send(data.encode())  # 发请求
            result = self.sock.recv(128).decode()  # 等待回复
            if result == 'OK':
                # 接收文件
                f = open(file, 'wb')
                while True:
                    data = self.sock.recv(1024)
                    if data == b"##":
                        break
                    f.write(data)
                f.close()
            else:
                print("该文件不存在")
    
    
    # 启动函数
    def main():
        sock = socket()
        sock.connect(ADDR)
    
        # 实例化对象用于调用方法
        ftp = FTPClient(sock)
    
        while True:
            print("""
            ============ 命令选项 =============
                          list 
                        get  file
                        put  file
                          exit
            ==================================
            """)
            cmd = input("请输入命令:")
    
            if cmd == "list":
                ftp.do_list()
            elif cmd == 'exit':
                ftp.do_exit()
            elif cmd[:3] == 'put':
                file = cmd.split(' ')[-1]
                ftp.do_put(file)
            elif cmd[:3] == 'get':
                file = cmd.split(' ')[-1]
                ftp.do_get(file)
            else:
                print("请输入正确命令")
    
    
    if __name__ == '__main__':
        main()

     IO并发模型

     IO概述

    • 什么是IO

      在程序中存在读写数据操作行为的事件均是IO行为,比如终端输入输出 ,文件读写,数据库修改和网络消息收发等。

    • 程序分类

      • IO密集型程序:在程序执行中有大量IO操作,而运算操作较少。消耗cpu较少,耗时长。

      • 计算密集型程序:程序运行中运算较多,IO操作相对较少。cpu消耗多,执行速度快,几乎没有阻塞。

    • IO分类:阻塞IO ,非阻塞IO,IO多路复用,异步IO等。

    阻塞IO

    • 定义:在执行IO操作时如果执行条件不满足则阻塞。阻塞IO是IO的默认形态。

    • 效率:阻塞IO效率很低。但是由于逻辑简单所以是默认IO行为。

    • 阻塞情况

      • 因为某种执行条件没有满足造成的函数阻塞 e.g. accept input recv

      • 处理IO的时间较长产生的阻塞状态 e.g. 网络传输,大文件读写

    非阻塞IO

    • 定义 :通过修改IO属性行为,使原本阻塞的IO变为非阻塞的状态。

    • 设置套接字为非阻塞IO

    sockfd.setblocking(bool)
    功能:设置套接字为非阻塞IO
    参数:默认为True,表示套接字IO阻塞;设置为False则套接字IO变为非阻塞
    • 超时检测 :设置一个最长阻塞时间,超过该时间后则不再阻塞等待。
    sockfd.settimeout(sec)
    功能:设置套接字的超时时间
    参数:设置的时间

    IO多路复用

    • 定义

      同时监控多个IO事件,当哪个IO事件准备就绪就执行哪个IO事件。以此形成可以同时处理多个IO的行为,避免一个IO阻塞造成其他IO均无法执行,提高了IO执行效率。

    • 具体方案

      • select方法 : windows linux unix

      • poll方法: linux unix

      • epoll方法: linux

       select 方法

    rs, ws, xs=select(rlist, wlist, xlist[, timeout])
    功能: 监控IO事件,阻塞等待IO发生
    参数:rlist  列表  读IO列表,添加等待发生的或者可读的IO事件
          wlist  列表  写IO列表,存放要可以主动处理的或者可写的IO事件
          xlist  列表 异常IO列表,存放出现异常要处理的IO事件
          timeout  超时时间
    
    返回值: rs 列表  rlist中准备就绪的IO
            ws 列表  wlist中准备就绪的IO
           xs 列表  xlist中准备就绪的IO
    """
        基于select的  IO网络并发模型
    
        IO 多路复用与非阻塞搭配
    
        重点代码!!!
    """
    from socket import *
    from select import select
    
    # 网络地址
    HOST = "0.0.0.0"
    PORT = 8888
    ADDR = (HOST, PORT)
    
    # 创建监听套接字
    sockfd = socket()
    sockfd.bind(ADDR)
    sockfd.listen(5)
    
    # 设置套接字为非阻塞IO
    sockfd.setblocking(False)
    
    # 初始只有监听套接字,先关注他
    rlist = [sockfd]  # 客户端连接
    wlist = []
    xlist = []
    
    # 循环(开始)监控IO对象
    while True:
        rs, ws, xs = select(rlist, wlist, xlist)
        # 处理就绪的IO   (不能出现IO阻塞)
        for r in rs:
            # 有客户端连接
            if r is sockfd:
                connfd, addr = rs.accept()
                print("Connect from", addr)
                # 将客户端连接套接字也监控起来
                connfd.setblocking(False)
                rlist.append(connfd)
            else:
                # 某个客户端发消息  connfd 就绪
                data = r.recv(1024).decode()
                # 客户端退出处理
                if not data:
                    rlist.remove(r)
                    r.close()
                    continue
                print(data)
                # r.send(b"OK")
                wlist.append(r)  # 存入写列表
    
        for w in ws:
            w.send(b"OK")
            wlist.remove(w)

      poll方法

    p = select.poll()
    功能 : 创建poll对象
    返回值: poll对象
    p.register(fd,event)   
    功能: 注册关注的IO事件
    参数:fd  要关注的IO
          event  要关注的IO事件类型
               常用类型:POLLIN  读IO事件(rlist)
                  POLLOUT 写IO事件 (wlist)
                  POLLERR 异常IO  (xlist)
                  POLLHUP 断开连接 
              e.g. p.register(sockfd,POLLIN|POLLERR)
    
    p.unregister(fd)
    功能:取消对IO的关注
    参数:IO对象或者IO对象的fileno
    events = p.poll()
    功能: 阻塞等待监控的IO事件发生
    返回值: 返回发生的IO
            events格式  [(fileno,event),()....]
            每个元组为一个就绪IO,元组第一项是该IO的fileno,第二项为该IO就绪的事件类型
    """
        训练: 使用poll方法完成与select_server相同的功能性代码
    """
    from socket import *
    from select import *
    
    # 网络地址
    HOST = "0.0.0.0"
    PORT = 8888
    ADDR = (HOST, PORT)
    
    # 创建监听套接字
    sockfd = socket()
    sockfd.bind(ADDR)
    sockfd.listen(5)
    
    # 设置套接字为非阻塞IO
    sockfd.setblocking(False)
    
    p = poll()  # 生成poll对象
    
    # 查找字典 fileno --> io object
    map = {sockfd.fileno(): sockfd}
    
    # 初始监听套接字,先关注他
    p.register(sockfd, POLLIN)
    
    # 循环监控IO对象
    while True:
        # events --> [(fileno,event),()]
        events = p.poll()
        # 遍历events 处理就绪的IO
        for fd, event in events:
            # 分类讨论
            if fd == sockfd.fileno():
                connfd, addr = map[fd].accept()
                print("Connect from", addr)
                # 将客户端连接套接字也监控起来
                connfd.setblocking(False)
                p.register(connfd, POLLIN | POLLERR)
                map[connfd.fileno()] = connfd  # 维护字典
            elif event == POLLIN:
                # 某个客户端发消息 connfd 就绪
                data = map[fd].recv(1024).decode()
                # 客户端退出处理
                if not data:
                    p.unregister(fd)  # 不在关注
                    map[fd].close()
                    del map[fd]  # 从字典删除
                    continue
                print(data)
                map[fd].send(b"OK")

      epoll方法

    使用方法 : 基本与poll相同

    生成对象改为 epoll()

    将所有事件类型改为EPOLL类型

    • epoll特点

      • epoll 效率比select poll要高

      • epoll 监控IO数量比select要多

      • epoll 的触发方式比poll要多 (EPOLLET边缘触发)

    """
        训练: 使用epoll方法完成与select_server相同的功能性代码
    
        基于epoll的 IO网络模型
    
        IO 多路复用与非阻塞搭配
    
        重点代码!!!
    """
    from socket import *
    from select import *
    
    # 网络地址
    HOST = "0.0.0.0"
    PORT = 8888
    ADDR = (HOST, PORT)
    
    # 创建监听套接字
    sockfd = socket()
    sockfd.bind(ADDR)
    sockfd.listen(5)
    
    # 设置套接字为非阻塞IO
    sockfd.setblocking(False)
    
    p = epoll()  # 生成epoll对象
    
    # 查找字典 fileno --> io object
    map = {sockfd.fileno(): sockfd}
    
    # 初始监听套接字,先关注他
    p.register(sockfd, EPOLLIN)
    
    # 循环监控IO对象
    while True:
        # events --> [(fileno,event),()]
        events = p.poll()
        # 遍历events 处理就绪的IO
        for fd, event in events:
            # 分类讨论
            if fd == sockfd.fileno():
                connfd, addr = map[fd].accept()
                print("Connect from", addr)
                # 将客户端连接套接字也监控起来
                connfd.setblocking(False)
                p.register(connfd, EPOLLIN | EPOLLERR)
                map[connfd.fileno()] = connfd  # 维护字典
            elif event == EPOLLIN:
                # 某个客户端发消息 connfd 就绪
                data = map[fd].recv(1024).decode()
                # 客户端退出处理
                if not data:
                    p.unregister(fd)  # 不在关注
                    map[fd].close()
                    del map[fd]  # 从字典删除
                    continue
                print(data)
                # map[fd].send(b"OK")
                p.unregister(fd)
                p.register(fd, EPOLLOUT)
            elif event == EPOLLOUT:
                map[fd].send(b"OK")
                p.unregister(fd)
                p.register(fd, EPOLLIN)

    IO并发模型

    利用IO多路复用等技术,同时处理多个客户端IO请求。

    • 优点 : 资源消耗少,能同时高效处理多个IO行为

    • 缺点 : 只针对处理并发产生的IO事件

    • 适用情况:HTTP请求,网络传输等都是IO行为,可以通过IO多路复用监控多个客户端的IO请求。

    • 并发服务实现过程

      【1】将关注的IO准备好,通常设置为非阻塞状态。

      【2】通过IO多路复用方法提交,进行IO监控。

      【3】阻塞等待,当监控的IO有发生时,结束阻塞。

      【4】遍历返回值列表,确定就绪IO事件。

      【5】处理发生的IO事件。

      【6】继续循环监控IO发生。

     

    Live what we love, do what we do, follow the heart, and do not hesitate.
  • 相关阅读:
    IT面试技巧(2)
    mySQL学习入门教程——4.内置函数
    weight decay (权值衰减)
    c++读取文件目录
    caffe 卷积层的运算
    一个物体多个标签的问题
    python caffe 在师兄的代码上修改成自己风格的代码
    caffe 细节
    vim让一些不可见的字符显示出来吧
    python 读写文件
  • 原文地址:https://www.cnblogs.com/failan/p/13866088.html
Copyright © 2011-2022 走看看