zoukankan      html  css  js  c++  java
  • 并发网络通信模型

    常见模型分类

    循环服务器模型 :循环接收客户端请求,处理请求。同一时刻只能处理一个请求,处理完毕后再

    处理下一个。

    • 优点:实现简单,占用资源少
    • 缺点:无法同时处理多个客户端请求
    • 适用情况:处理的任务可以很快完成,客户端无需长期占用服务端程序。udp比tcp更适合循环。

    IO并发模型:利用IO多路复用,异步IO等技术,同时处理多个客户端IO请求。

    • 优点 : 资源消耗少,能同时高效处理多个IO行为
    • 缺点 : 只能处理并发产生的IO事件,无法处理cpu计算
    • 适用情况:HTTP请求,网络传输等都是IO行为。

    多进程/线程网络并发模型:每当一个客户端连接服务器,就创建一个新的进程/线程为该客户端服务,客户端退出时再销毁该进程/线程。

    • 优点:能同时满足多个客户端长期占有服务端需求,可以处理各种请求。
    • 缺点: 资源消耗较大
    • 适用情况:客户端同时连接量较少,需要处理行为较复杂情况。

    基于fork的多进程网络并发模型

    1. 创建监听套接字
    2. 等待接收客户端请求
    3. 客户端连接创建新的进程处理客户端请求
    4. 原进程继续等待其他客户端连接
    5. 如果客户端退出,则销毁对应的进程
    from socket import *
    import os
    import signal
    
    # 创建监听套接字
    HOST = '0.0.0.0'
    PORT = 8888
    ADDR = (HOST,PORT)
    
    # 客户端服务函数
    def handle(c):
      while True:
        data = c.recv(1024)
        if not data:
          break
        print(data.decode())
        c.send(b'OK')
      c.close()
    
    s = socket()  # tcp套接字
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)   # 设置套接字端口重用
    s.bind(ADDR)
    s.listen(3)
    
    signal.signal(signal.SIGCHLD,signal.SIG_IGN)    # 处理僵尸进程
    
    print("Listen the port %d..." % PORT)
    
    # 循环等待客户端连接
    while True:
      try:
        c,addr = s.accept()
      except KeyboardInterrupt:
        os._exit(0)
      except Exception as e:
        print(e)
        continue
    
      # 创建子进程处理这个客户端
      pid = os.fork()
      if pid == 0:  # 处理客户端请求
        s.close()
        handle(c)
        os._exit(0)  # handle处理完客户端请求子进程也退出
    
      # 无论出错或者父进程都要循环回去接受请求
      # c对于父进程没用
      c.close() 

    基于threading的多线程网络并发

    1. 创建监听套接字
    2. 循环接收客户端连接请求
    3. 当有新的客户端连接创建线程处理客户端请求
    4. 主线程继续等待其他客户端连接
    5. 当客户端退出,则对应分支线程退出
    from socket import *
    from threading import Thread
    import sys
    
    # 创建监听套接字
    HOST = '0.0.0.0'
    PORT = 8888
    ADDR = (HOST,PORT)
    
    # 处理客户端请求
    def handle(c):
      while True:
        data = c.recv(1024)
        if not data:
          break
        print(data.decode())
        c.send(b'OK')
      c.close()
    
    s = socket()  # tcp套接字
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind(ADDR)
    s.listen(3)
    
    print("Listen the port %d..."%PORT)
    # 循环等待客户端连接
    while True:
      try:
        c,addr = s.accept()
      except KeyboardInterrupt:
        sys.exit("服务器退出")
      except Exception as e:
        print(e)
        continue
    
      # 创建线程处理客户端请求
      t = Thread(target=handle, args=(c,))
      t.setDaemon(True)   # 父进程结束则所有进程终止
      t.start()

    ftp 文件服务器

    项目功能 :

    * 客户端有简单的页面命令提示: 功能包含:

    1. 查看服务器文件库中的文件列表(普通文件)
    2. 可以下载其中的某个文件到本地
    3. 可以上传客户端文件到服务器文件库 

    * 服务器需求 :

    1. 允许多个客户端同时操作
    2. 每个客户端可能回连续发送命令 

    技术分析:

    1. tcp套接字更适合文件传输
    2. 并发方案 ---》 fork 多进程并发
    3. 对文件的读写操作
    4. 获取文件列表 ----》 os.listdir() 

    粘包的处理

    整体结构设计

    1. 服务器功能封装在类中(上传,下载,查看列表)
    2. 创建套接字,流程函数调用 main()
    3. 客户端负责发起请求,接受回复,展示

    服务端负责接受请求,逻辑处理

    from socket import *
    from threading import Thread
    import os
    import time
    
    # 全局变量
    HOST = '0.0.0.0'
    PORT = 8080
    ADDR = (HOST,PORT)
    FTP = "/home/tarena/FTP/"  # 文件库位置
    
    # 创建文件服务器服务端功能类
    class FTPServer(Thread):
      def __init__(self,connfd):
        self.connfd = connfd
        super().__init__()
    
      def do_list(self):
        # 获取文件列表
        files = os.listdir(FTP)
        if not files:
          self.connfd.send("文件库为空".encode())
          return
        else:
          self.connfd.send(b'OK')
          time.sleep(0.1)  # 防止和后面发送内容粘包
    
        # 拼接文件列表
        files_ = ""
        for file in files:
          if file[0] != '.' and 
                  os.path.isfile(FTP+file):
            files_ += file + '
    '
        self.connfd.send(files_.encode())
    
      def do_get(self,filename):
        try:
          fd = open(FTP+filename,'rb')
        except Exception:
          self.connfd.send("文件不存在".encode())
          return
        else:
          self.connfd.send(b'OK')
          time.sleep(0.1)
        # 文件发送
        while True:
          data = fd.read(1024)
          if not data:
            time.sleep(0.1)
            self.connfd.send(b'##')
            break
          self.connfd.send(data)
    
      # 循环接收客户端请求
      def run(self):
        while True:
          data = self.connfd.recv(1024).decode()
          if not data or data == 'Q':
            return 
          elif data == 'L':
            self.do_list()
          elif data[0] == 'G':   # G filename
            filename = data.split(' ')[-1]
            self.do_get(filename)
    
    # 网络搭建
    def main():
      # 创建套接字
      sockfd = socket()
      sockfd.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
      sockfd.bind(ADDR)
      sockfd.listen(3)
      print("Listen the port %d..."%PORT)
      while True:
        try:
          connfd,addr = sockfd.accept()
          print("Connect from",addr)
        except KeyboardInterrupt:
          print("服务器程序退出")
          return
        except Exception as e:
          print(e)
          continue
    
        # 创建新的线程处理客户端
        client = FTPServer(connfd)
        client.setDaemon(True)
        client.start()   # 运行run方法
    
    
    if __name__ == "__main__":
      main()
    ftp_sever
    from socket import *
    import sys
    
    ADDR = ('127.0.0.1',8080) # 服务器地址
    
    # 客户端功能处理类
    class FTPClient:
      def __init__(self,sockfd):
        self.sockfd = sockfd
    
      def do_list(self):
        self.sockfd.send(b'L')  # 发送请求
        # 等待回复
        data = self.sockfd.recv(128).decode()
        if data == 'OK':
          # 一次接收文件列表字符串
          data = self.sockfd.recv(4096)
          print(data.decode())
        else:
          print(data)
    
      def do_get(self,filename):
        # 发送请求
        self.sockfd.send(('G '+filename).encode())
        # 等待回复
        data = self.sockfd.recv(128).decode()
        if data == 'OK':
          fd = open(filename,'wb')
          # 接收文件
          while True:
            data = self.sockfd.recv(1024)
            if data == b'##':
              break
            fd.write(data)
          fd.close()
        else:
          print(data)
    
      def do_quit(self):
        self.sockfd.send(b'Q')
        self.sockfd.close()
        sys.exit("谢谢使用")
    
    # 创建客户端网络
    def main():
      sockfd = socket()
      try:
        sockfd.connect(ADDR)
      except Exception as e:
        print(e)
        return
    
      ftp = FTPClient(sockfd) # 实例化对象
    
      # 循环发送请求
      while True:
        print("
    =========命令选项==========")
        print("****      list         ****")
        print("****    get file       ****")
        print("****    put file       ****")
        print("****      quit         ****")
        print("=============================")
    
        cmd = input("输入命令:")
    
        if cmd.strip() == 'list':
          ftp.do_list()
        elif cmd[:3] == 'get':
          # get filename
          filename = cmd.strip().split(' ')[-1]
          ftp.do_get(filename)
        elif cmd[:3] == 'put':
          # put ../filename
          filename = cmd.strip().split(' ')[-1]
          ftp.do_put(filename)
        elif cmd.strip() == 'quit':
          ftp.do_quit()
        else:
          print("请输入正确命令")
    
    
    
    if __name__ == "__main__":
      main()
    ftp_client

    IO并发

    定义:在内存中数据交换的操作被定义为IO操作,IO------输入输出

      内存和磁盘进行数据交换: 文件的读写 数据库更新
      内存和终端数据交换 : input  print
                  sys.stdin  sys.stdout   sys.stderr
      内存和网络数据的交换: 网络连接 recv send recvfrom

      IO密集型程序 : 程序执行中有大量的IO操作,而较少的cpu运算操作。消耗cpu较少,IO运行时间长

      CPU(计算)密集型程序:程序中存在大量的cpu运算,IO操作相对较少,消耗cpu大。

    IO分类

    IO分为:阻塞IO、非阻塞IO、IO多路复用、事件驱动IO、异步IO

    阻塞IO 

    • 定义: 在执行IO操作时如果执行条件不满足则阻塞。阻塞IO是IO的默认形态。
    • 效率: 阻塞IO是效率很低的一种IO。但是由于逻辑简单所以是默认IO行为。

    阻塞情况:

    • 因为某种执行条件没有满足造成的函数阻塞  e.g. accept input recv
    • 处理IO的时间较长产生的阻塞状态  e.g. 网络传输, 大文件读写

    非阻塞IO

    定义 : 通过修改IO属性行为, 使原本阻塞的IO变为非阻塞的状态。

    • 设置套接字为非阻塞IO
      • sockfd.setblocking(bool)
      • 功能: 设置套接字为非阻塞IO
      • 参数: 默认为True,表示套接字IO阻塞;设置为False则套接字IO变为非阻塞
    • 超时检测 :设置一个最长阻塞时间,超过该时间后则不再阻塞等待。
      • sockfd.settimeout(sec)
      • 功能:设置套接字的超时时间
      • 参数:设置的时间

    IO多路复用

    定义 通过一个监测,可以同时监控多个IO事件的行为。当哪个IO事件可以执行,即让这个IO事件发生。

    rs, ws, xs = select(rlist, wlist, xlist[, timeout])  监控IO事件,阻塞等待监控的IO时间发生

    参数 :

    • rlist  列表,存放(被动)等待处理的IO (接收)
    • wlist  列表,存放主动处理的IO(发送)
    • xlist  列表,存放出错,希望去处理的IO(异常)
    • timeout   超时检测

    返回值:

    • rs  列表  rlist中准备就绪的IO
    • ws  列表  wlist中准备就绪的IO
    • xs  列表  xlist中准备就绪的IO

    select 实现tcp服务

    1. 将关注的IO放入对应的监控类别列表
    2. 通过select函数进行监控
    3. 遍历select返回值列表,确定就绪IO事件
    4. 处理发生的IO事件
    from socket import *
    from select import select
    
    # 创建一个监听套接字作为关注的IO
    s = socket()
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind(('0.0.0.0',8888))
    s.listen(3)
    
    # 设置关注列表
    rlist = [s]
    wlist = []
    xlist = [s]
    
    # 循环监控IO
    while True:
      rs,ws,xs = select(rlist,wlist,xlist)
      # 遍历三个返回列表,处理IO
      for r in rs:
        # 根据遍历到IO的不同使用if分情况处理
        if r is s:
          c,addr = r.accept()
          print("Connect from",addr)
          rlist.append(c) # 增加新的IO事件
        # else为客户端套接字就绪情况
        else:
          data = r.recv(1024)
          # 客户端退出
          if not data:
            rlist.remove(r) # 从关注列表移除
            r.close()
            continue # 继续处理其他就绪IO
          print("Receive:",data.decode())
          # r.send(b'OK')
          # 我们希望主动处理这个IO对象
          wlist.append(r)
    
      for w in ws:
        w.send(b'OK')
        wlist.remove(w) # 使用后移除
    
      for x in xs:
        pass 

    注意: 

    • wlist中如果存在IO事件,则select立即返回给ws
    • 处理IO过程中不要出现死循环占有服务端的情况
    • IO多路复用消耗资源较少,效率较高

    扩展: 位运算

    将整数转换为二进制, 按照二进制位进行运算符操作
      & 按位与   | 按位或   ^ 按位异或    << 左移   >> 右移
      11 1011    14 1110
      (11 & 14    1010)   (11 | 14    1111)   (11 ^ 14    0101 )
      11 << 2 ===> 44 右侧补0    14 >> 2 ===> 3 挤掉右侧的数字
      使用 : 1. 在做底层硬件时操作寄存器
          2. 做标志位的过滤

    poll方法实现IO多路复用

    p = select.poll()    创建poll对象

    p.register(fd,event)  注册关注的IO事件

    • fd       要关注的IO
    • event 要关注的IO事件类型

      常用类型:

      1. POLLIN 读IO事件(rlist)
      2. POLLOUT 写IO事件 (wlist)
      3. POLLERR 异常IO (xlist)
      4. POLLHUP 断开连接

          e.g. p.register(sockfd,POLLIN|POLLERR)

    p.unregister(fd)    取消对IO的关注

    • 参数: IO对象或者IO对象的fileno

    events = p.poll()       

    • 功能:    阻塞等待监控的IO事件发生
    • 返回值: 返回发生的IO事件

        events 是一个列表 [(fileno,evnet),(),()....]

        每个元组为一个就绪IO,元组第一项是该IO的fileno,第二项为该IO就绪的事件类型

    poll_server 步骤

    1. 创建套接字
    2. 将套接字register
    3. 创建查找字典,并维护
    4. 循环监控IO发生
    5. 处理发生的IO
    from socket import *
    from select import *
    
    # 创建套接字
    s = socket()
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind(('0.0.0.0',8888))
    s.listen(3)
    
    # 创建poll对象关注s
    p = poll()
    
    # 建立查找字典,用于通过fileno查找IO对象
    fdmap = {s.fileno():s}
    
    # 关注s
    p.register(s,POLLIN|POLLERR)
    
    # 循环监控
    while True:
      events = p.poll()
      # 循环遍历发生的事件 fd-->fileno
      for fd,event in events:
        # 区分事件进行处理
        if fd == s.fileno():
          c,addr = fdmap[fd].accept()
          print("Connect from",addr)
          # 添加新的关注IO
          p.register(c,POLLIN|POLLERR)
          fdmap[c.fileno()] = c # 维护字典
        # 按位与判定是POLLIN就绪
        elif event & POLLIN:
          data = fdmap[fd].recv(1024)
          if not data:
            p.unregister(fd) # 取消关注
            fdmap[fd].close()
            del fdmap[fd]  # 从字典中删除
            continue
          print("Receive:",data.decode())
          fdmap[fd].send(b'OK')

    epoll方法

    1. 使用方法 : 基本与poll相同

    • 生成对象改为 epoll()
    • 将所有事件类型改为EPOLL类型

    2. epoll特点

    • epoll 效率比select poll要高
    • epoll 监控IO数量比select要多
    • epoll 的触发方式比poll要多 (EPOLLET边缘触发)
    from socket import *
    from select import *
    
    # 创建套接字
    s = socket()
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind(('0.0.0.0',8888))
    s.listen(3)
    
    # 创建epoll对象关注s
    ep = epoll()
    
    # 建立查找字典,用于通过fileno查找IO对象
    fdmap = {s.fileno():s}
    
    # 关注s
    ep.register(s,EPOLLIN|EPOLLERR)
    
    # 循环监控
    while True:
      events = ep.poll()
      # 循环遍历发生的事件 fd-->fileno
      for fd,event in events:
        print("亲,你有IO需要处理哦")
        # 区分事件进行处理
        if fd == s.fileno():
          c,addr = fdmap[fd].accept()
          print("Connect from",addr)
          # 添加新的关注IO
          # 将触发方式变为边缘触发
          ep.register(c,EPOLLIN|EPOLLERR|EPOLLET)
          fdmap[c.fileno()] = c # 维护字典
        # 按位与判定是EPOLLIN就绪
        # elif event & EPOLLIN:
        #   data = fdmap[fd].recv(1024)
        #   if not data:
        #     ep.unregister(fd) # 取消关注
        #     fdmap[fd].close()
        #     del fdmap[fd]  # 从字典中删除
        #     continue
        #   print("Receive:",data.decode())
        #   fdmap[fd].send(b'OK')

     

  • 相关阅读:
    深入浅出Mybatis系列(一)Mybatis入门
    LinkedList其实就那么一回事儿之源码分析
    深入浅出Mybatis系列(八)mapper映射文件配置之select、resultMap
    ArrayList其实就那么一回事儿之源码浅析
    springMVC 源码解读系列(一)初始化
    深入浅出Mybatis系列(三)配置详解之properties与environments(mybatis源码篇)
    深入浅出Mybatis系列(四)配置详解之typeAliases别名(mybatis源码篇)
    深入浅出Mybatis系列(六)objectFactory、plugins、mappers简介与配置
    深入浅出Mybatis系列(二)配置简介(mybatis源码篇)
    深入浅出Mybatis系列(七)mapper映射文件配置之insert、update、delete
  • 原文地址:https://www.cnblogs.com/LXP-Never/p/9445395.html
Copyright © 2011-2022 走看看