1.网络编程
OSI 7层模型
TCP/IP模型
通信地址
IP地址相关命令
ifconfig:查看Linux系统下计算机的IP地址
ping[ip]: 查看计算机的连通性.
UDP传输方法
套接字(Socket):实现网络编程进行数据传输的一种技术手段,网络上各种各样的网络服务大多都是基于 Socket 来完成通信的。
模块: import socker
UDP套接字编程
sockfd=socket.socket(socket_family,socket_type,proto=0)
功能:创建套接字
参数:socket_family 网络地址类型 AF_INET表示ipv4
socket_type 套接字类型 SOCK_DGRAM 表示udp套接字 (也叫数据报套接字)
proto 通常为0 选择子协议
返回值: 套接字对象
绑定地址
本地地址 : 'localhost' , '127.0.0.1'
网络地址 : '172.40.91.185' (通过ifconfig查看)
自动获取地址: '0.0.0.0'
sockfd.bind(addr) 功能: 绑定本机网络地址 参数: 二元元组 (ip,port) ('0.0.0.0',8888)
消息收发
data,addr = sockfd.recvfrom(buffersize) 功能: 接收UDP消息 参数: 每次最多接收多少字节 返回值: data 接收到的内容 addr 消息发送方地址 n = sockfd.sendto(data,addr) 功能: 发送UDP消息 参数: data 发送的内容 bytes格式 addr 目标地址 返回值:发送的字节数
关闭套接字
sockfd.close()
功能:关闭套接字
TCP传输方法
三次握手(建立连接)
客户端向服务器发送消息报文请求连接
服务器收到请求后,回复报文确定可以连接
客户端收到回复,发送最终报文连接建立
四次挥手(断开连接)
主动方发送报文请求断开连接
被动方收到请求后,立即回复,表示准备断开
被动方准备就绪,再次发送报文表示可以断开
主动方收到确定,发送最终报文完成断开
TCP服务端
1.创建套接字
sockfd=socket.socket(socket_family,socket_type,proto=0)
功能:创建套接字
参数:socket_family 网络地址类型 AF_INET表示ipv4
socket_type 套接字类型 SOCK_STREAM 表示tcp套接字 (也叫流式套接字)
proto 通常为0 选择子协议
返回值: 套接字对象
2.绑定地址
3.设置监听
sockfd.listen(n)
功能 : 将套接字设置为监听套接字,确定监听队列大小
参数 : 监听队列大小
4.处理客户端连接请求
connfd,addr = sockfd.accept()
功能: 阻塞等待处理客户端请求
返回值: connfd 客户端连接套接字
addr 连接的客户端地址
5.消息收发
data = connfd.recv(buffersize) 功能 : 接受客户端消息 参数 :每次最多接收消息的大小 返回值: 接收到的内容 n = connfd.send(data) 功能 : 发送消息 参数 :要发送的内容 bytes格式 返回值: 发送的字节数
6.关闭套接字
TCP客户端
创建套接字
请求连接
sockfd.connect(server_addr)
功能:连接服务器
参数:元组 服务器地址
收发消息
注意: 防止两端都阻塞,recv send要配合
关闭套接字
TCP套接字细节
1.tcp连接中当一端退出,另一端如果阻塞在recv,此时recv会立即返回一个空字串。
2.tcp连接中如果一端已经不存在,仍然试图通过send向其发送数据则会产生BrokenPipeError
3. 一个服务端可以同时连接多个客户端,也能够重复被连接
4.tcp粘包问题
产生原因
1.为了解决数据再传输过程中可能产生的速度不协调问题,操作系统设置了缓冲区
2.实际网络工作过程比较复杂,导致消息收发速度不一致
3.tcp以字节流方式进行数据传输,在接收时不区分消息边界
影响
如果每次发送内容是一个独立的含义,需要接收端独立解析此时粘包会有影响。
处理方法
人为的添加消息边界,用作消息之间的分割
控制发送的速度
多任务编程
进程(Process)
状态:(三态)
就绪态: 进程具备执行条件,等待系统调度分配cpu资源
运行态 : 进程占有cpu正在运行
等待态 : 进程阻塞等待,此时会让出cpu
(五态)
新建 : 创建一个进程,获取资源的过程
终止 : 进程结束,释放资源的过程
进程命令
查看进程信息
ps -aux
-
-
PID : 操作系统分配给进程的编号,大于0的整数,系统中每个进程的PID都不重复。PID也是重要的区分进程的标志。
-
%CPU,%MEM : 占有的CPU和内存
-
STAT : 进程状态信息,S I 表示阻塞状态 ,R 表示就绪状态或者运行状态
-
START : 进程启动时间
-
进程树形结构
pstree
多进程编程
使用模块: multiprocessing
创建流程
【1】 将需要新进程执行的事件封装为函数
【2】 通过模块的Process类创建进程对象,关联函数
【3】 可以通过进程对象设置进程信息及属性
【4】 通过进程对象调用start启动进程
【5】 通过进程对象调用join回收进程资源
Process()
功能 : 创建进程对象
参数 : target 绑定要执行的目标函数
args 元组,用于给target函数位置传参
kwargs 字典,给target函数键值传参
p.start()
功能 : 启动进程
注意 : 启动进程此时target绑定函数开始执行,该函数作为新进程执行内容,此时进程真正被创建
p.join([timeout])
功能:阻塞等待回收进程
参数:超时时间
进程执行现象理解 (难点)
-
-
子进程只执行指定的函数,其余内容均是父进程执行内容,但是子进程也拥有其他父进程资源。
-
各个进程在执行上互不影响,也没有先后顺序关系。
-
进程创建后,各个进程空间独立,相互没有影响。
-
进程对象属性
-
-
p.pid 对应子进程的PID号
-
p.is_alive() 查看子进程是否在生命周期
-
p.daemon 设置父子进程的退出关系
-
如果设置为True则该子进程会随父进程的退出而结束
-
要求必须在start()前设置
-
-
os.getpid()
功能: 获取一个进程的PID值
返回值: 返回当前进程的PID
os.getppid()
功能: 获取父进程的PID号
返回值: 返回父进程PID
sys.exit(info)
功能:退出进程
参数:字符串 表示退出时打印内容
-
孤儿进程 : 父进程先于子进程退出,此时子进程成为孤儿进程。
-
特点: 孤儿进程会被系统进程收养,此时系统进程就会成为孤儿进程新的父进程,孤儿进程退出该进程会自动处理。
-
-
僵尸进程 : 子进程先于父进程退出,父进程又没有处理子进程的退出状态,此时子进程就会成为僵尸进程。
-
特点: 僵尸进程虽然结束,但是会存留部分进程信息资源在内存中,大量的僵尸进程会浪费系统的内存资源。
-
如何避免僵尸进程产生
-
使用join()回收
-
-
-
from signal import * signal(SIGCHLD,SIG_IGN)
创建进程类
-
【1】 继承Process类
【2】 重写
__init__
方法添加自己的属性,使用super()加载父类属性【3】 重写run()方法
-
使用方法
【1】 实例化对象
【2】 调用start自动执行run方法
-
必要性
【1】 进程的创建和销毁过程消耗的资源较多
【2】 当任务量众多,每个任务在很短时间内完成时,需要频繁的创建和销毁进程。此时对计算机压力较大
【3】 进程池技术很好的解决了以上问题。
-
原理
创建一定数量的进程来处理事件,事件处理完进 程不退出而是继续处理其他事件,直到所有事件全都处理完毕统一销毁。增加进程的重复利用,降低资源消耗。
-
进程池实现
from multiprocessing import Pool Pool(processes) 功能: 创建进程池对象 参数: 指定进程数量,默认根据系统自动判定
2.将事件加入进程池队列执行
pool.apply_async(func,args,kwds)
功能: 使用进程池执行 func事件
参数: func 事件函数
args 元组 给func按位置传参
kwds 字典 给func按照键值传参
3.关闭进程池
pool.close()
功能: 关闭进程池
4.回收进程池中进程
pool.join()
功能: 回收进程池中进程
进程通信
-
-
常用进程间通信方法:消息队列,套接字等。
-
消息队列使用
-
通信原理: 在内存中开辟空间,建立队列模型,进程通过队列将消息存入,或者从队列取出完成进程间通信。
-
-
from multiprocessing import Queue q = Queue(maxsize=0) 功能: 创建队列对象 参数:最多存放消息个数 返回值:队列对象 q.put(data,[block,timeout]) 功能:向队列存入消息 参数:data 要存入的内容 block 设置是否阻塞 False为非阻塞 timeout 超时检测 q.get([block,timeout]) 功能:从队列取出消息 参数:block 设置是否阻塞 False为非阻塞 timeout 超时检测 返回值: 返回获取到的内容 q.full() 判断队列是否为满 q.empty() 判断队列是否为空 q.qsize() 获取队列中消息个数 q.close() 关闭队列
群聊聊天室
功能 : 类似qq群功能
【1】 有人进入聊天室需要输入姓名,姓名不能重复
【2】 有人进入聊天室时,其他人会收到通知:xxx 进入了聊天室
【3】 一个人发消息,其他人会收到:xxx : xxxxxxxxxxx
【4】 有人退出聊天室,则其他人也会收到通知:xxx退出了聊天室
【5】 扩展功能:服务器可以向所有用户发送公告:管理员消息: xxxxxxxxx
代码:
""" chat_server.py """ from socket import * from multiprocessing import Process # 服务器地址 HOST = "0.0.0.0" PORT = 8000 ADDR = (HOST, PORT) # 存储用户信息 {name:address} user = {} # 处理进入聊天室 def login(sock, name, addr): if name in user or "管理" in name: # 反馈结果 sock.sendto(b"FAIL", addr) else: sock.sendto(b"OK", addr) # 循环通知其他人 msg = "欢迎 %s 进入聊天室" % name for i in user: sock.sendto(msg.encode(), user[i]) user[name] = addr # 增加该用户 # print(user) # 处理聊天 def chat(sock, name, content): msg = "%s : %s" % (name, content) for i in user: # 除去本人 if i != name: sock.sendto(msg.encode(), user[i]) # 处理退出 def exit(sock, name): del user[name] # 删除用户 msg = "%s 退出了聊天室" % name for i in user: sock.sendto(msg.encode(), user[i]) # 处理客户端请求 def request(sock): # 循环接收各种客户端请求 (总分模式) while True: # 接收所有客户端所有请求 data, addr = sock.recvfrom(1024) # 对数据结构进行简单解析 tmp = data.decode().split(' ', 2) if tmp[0] == "LOGIN": # tmp --> [LOGIN,name] login(sock, tmp[1], addr) elif tmp[0] == "CHAT": # tmp --> [CHAT,name,content] chat(sock, tmp[1], tmp[2]) elif tmp[0] == "EXIT": # tmp--> [EXIT,name] exit(sock, tmp[1]) # 程序启动函数 def main(): # UDP套接字 sock = socket(AF_INET, SOCK_DGRAM) sock.bind(ADDR) # 创建子进程 p = Process(target=request, args=(sock,)) p.daemon = True p.start() # 发送管理员信息 while True: content = input("管理员消息:") if content == 'exit': break msg = "CHAT 管理员消息 " + content sock.sendto(msg.encode(), ADDR) if __name__ == '__main__': main()
""" chat room 客户端代码 """ from socket import * from multiprocessing import Process import sys # 服务器地址 ADDR = ('119.3.124.77', 8000) # 处理登录 def login(sock): while True: # 进入聊天室 name = input("Name:") # 发送姓名 msg = "LOGIN " + name sock.sendto(msg.encode(), ADDR) # 接收结果 result, addr = sock.recvfrom(128) if result.decode() == 'OK': print("进入聊天室") return name else: print("该用户已存在") # 接收消息 def recv_msg(sock): while True: data, addr = sock.recvfrom(1024 * 10) msg = " %s 发言:" % data.decode() print(msg, end="") # 不换行 # 发送消息 def send_msg(sock, name): while True: try: content = input("发言:") except KeyboardInterrupt: content = "exit" # 输入exit表示要退出聊天室 if content == "exit": msg = "EXIT " + name sock.sendto(msg.encode(), ADDR) sys.exit("您已退出聊天室") msg = "CHAT %s %s" % (name, content) # 给服务端发送聊天请求 sock.sendto(msg.encode(), ADDR) # 网络连接 def main(): sock = socket(AF_INET, SOCK_DGRAM) sock.bind(("0.0.0.0",12345)) # 地址不变化 name = login(sock) # 进入聊天室 # 创建子进程 用于接收消息 p = Process(target=recv_msg, args=(sock,)) p.daemon = True # 父进程退出子进程也退出 p.start() send_msg(sock, name) # 父进程发送消息 if __name__ == '__main__': main()
线程(Thread)
什么是线程
线程被称为轻量级的进程,也是多任务编程方式
也可以利用计算机的多cpu资源
线程可以理解为进程中再开辟的分支任务
线程特征
一个进程中可以包含多个
线程线程也是一个运行行为,消耗计算机资源
一个进程中的所有线程共享这个进程的资源
多个线程之间的运行同样互不影响各自运行
from threading import Thread t = Thread() 功能:创建线程对象 参数:target 绑定线程函数 args 元组 给线程函数位置传参 kwargs 字典 给线程函数键值传参
2.启动线程
t.start()
3.回收线程
t.join([timeout])
-
t.setName() 设置线程名称
-
t.getName() 获取线程名称
-
t.is_alive() 查看线程是否在生命周期
-
t.setDaemon() 设置daemon属性值
-
t.isDaemon() 查看daemon属性值
-
-
【1】 继承Thread类
【2】 重写
__init__
方法添加自己的属性,使用super()加载父类属性【3】 重写run()方法
-
使用方法
【1】 实例化对象
【2】 调用start自动执行run方法
-
-
共享资源争夺
-
共享资源:多个进程或者线程都可以操作的资源称为共享资源。对共享资源的操作代码段称为临界区。
-
影响 : 对共享资源的无序操作可能会带来数据的混乱,或者操作错误。此时往往需要同步互斥机制协调操作顺序。
-
-
同步互斥机制
-
- 互斥 : 互斥是一种制约关系,当一个进程或者线程占有资源时会进行加锁处理,此时其他进程线程就无法操作该资源,直到解锁后才能操作。
-
线程Event
from threading import Event e = Event() 创建线程event对象 e.wait([timeout]) 阻塞等待e被set e.set() 设置e,使wait结束阻塞 e.clear() 使e回到未被设置状态 e.is_set() 查看当前e是否被设置
线程锁 Lock
from threading import Lock lock = Lock() 创建锁对象 lock.acquire() 上锁 如果lock已经上锁再调用会阻塞 lock.release() 解锁 with lock: 上锁 ... ... with代码块结束自动解锁
网络并发模型
多进程并发模型
-
-
等待客户端连接
-
客户端连接,则创建新的进程具体处理客户端请求
-
主进程继续等待其他客户端连接
-
""" 多进程并发模型 process_server """ from signal import * from socket import * from multiprocessing import Process # 服务器地址 HOST = "0.0.0.0" PORT = 8888 ADDR = (HOST, PORT) # 实现具体的业务功能,客户端请求都在这里处理 def handle(connfd): # 与客户端配合测试 while True: data = connfd.recv(1024) if not data: break print(data.decode()) connfd.send(b"ok") connfd.close() # 函数中编写并发服务 def main(): sock = socket() # tcp套接字 sock.bind(ADDR) sock.listen(5) print("Listen the port %d" % PORT) signal(SIGCHLD, SIG_IGN) # 处理僵尸进程 while True: # 循环接收客户端连接 try: connfd, addr = sock.accept() print("Connect from", addr) except KeyboardInterrupt: # 退出服务 sock.close() break # 为连接的客户端创建新进程 p = Process(target=handle, args=(connfd,)) p.daemon = True # 客户端随服务端退出 p.start() if __name__ == '__main__': main()
""" tcp_client """ from socket import * # 服务器地址 server_addr = ("127.0.0.1",8888) tcp_socket = socket() tcp_socket.connect(server_addr) # 收发消息 while True: msg = input(">>") if not msg: break tcp_socket.send(msg.encode()) data = tcp_socket.recv(1024) print("从服务器收到:",data.decode()) tcp_socket.close()
多线程并发模型
-
创建网络套接字用于接收客户端请求
-
等待客户端连接
-
客户端连接,则创建新的线程具体处理客户端请求
-
主线程继续等待其他客户端连接
-
如果客户端退出,则销毁对应的线程
""" 多线程并发模型 thread_server """ from socket import * from threading import Thread # 服务器地址 HOST = "0.0.0.0" PORT = 8888 ADDR = (HOST, PORT) # 实现具体的业务功能,客户端请求都在这里处理 class MyThread(Thread): def __init__(self, connfd): self.connfd = connfd super().__init__() def run(self): # 与客户端配合测试 while True: data = self.connfd.recv(1024) if not data: break print(data.decode()) self.connfd.send(b"ok") self.connfd.close() # 函数中编写并发服务 def main(): sock = socket() # tcp套接字 sock.bind(ADDR) sock.listen(5) print("Listen the port %d" % PORT) while True: # 循环接收客户端连接 try: connfd, addr = sock.accept() print("Connect from", addr) except KeyboardInterrupt: # 退出服务 sock.close() break # 为连接的客户端创建新进程 t = MyThread(connfd) t.setDaemon(True) # 客户端随服务端退出 t.start() if __name__ == '__main__': main()
ftp 文件服务器
【1】 分为服务端和客户端,要求可以有多个客户端同时操作。
【2】 客户端可以查看服务器文件库中有什么文件。
【3】 客户端可以从文件库中下载文件到本地。
【4】 客户端可以上传一个本地文件到文件库。
【5】 使用print在客户端打印命令输入提示,引导操作
""" ftp文件管理服务端 多线程 tcp 并发 """ from socket import * from threading import Thread import os from time import sleep # 服务器地址 HOST = "0.0.0.0" PORT = 8888 ADDR = (HOST, PORT) # 文件库位置 FTP = "/home/tarena/FTP/" # 实现具体的业务功能,客户端请求都在这里处理 class FTPServer(Thread): def __init__(self, connfd): self.connfd = connfd super().__init__() # 处理请求文件列表 def do_list(self): # 判断文件库是否为空 files = os.listdir(FTP) if not files: self.connfd.send(b"FAIL") # 失败 else: self.connfd.send(b"OK") sleep(0.1) # 一次发送所有文件名 data = " ".join(files) self.connfd.send(data.encode()) sleep(0.1) self.connfd.send(b"##") # 处理上传 def do_put(self, filename): # 判断文件是否已存在 if os.path.exists(FTP + filename): self.connfd.send(b"FAIL") return else: self.connfd.send(b"OK") # 接收文件 f = open(FTP + filename, 'wb') while True: data = self.connfd.recv(1024) if data == b"##": break f.write(data) f.close() # 处理下载 def do_get(self, filename): try: f = open(FTP + filename, 'rb') except: # 文件不存在 self.connfd.send(b"FAIL") return else: self.connfd.send(b"OK") sleep(0.1) # 发送文件 while True: data = f.read(1024) if not data: break self.connfd.send(data) sleep(0.1) self.connfd.send(b"##") f.close() # 线程启动方法 def run(self): while True: # 接收某一个各类请求 data = self.connfd.recv(1024).decode() # print(data) if not data or data == "EXIT": break elif data == "LIST": self.do_list() elif data[:4] == "STOR": filename = data.split(' ')[-1] self.do_put(filename) elif data[:4] == "RETR": filename = data.split(' ')[-1] self.do_get(filename) self.connfd.close() # 函数中搭建并发结构 def main(): sock = socket() # tcp套接字 sock.bind(ADDR) sock.listen(5) print("Listen the port %d" % PORT) while True: # 循环接收客户端连接 try: connfd, addr = sock.accept() print("Connect from", addr) except KeyboardInterrupt: # 退出服务 sock.close() break # 使用自定义线程类为连接的客户端创建新线程 t = FTPServer(connfd) t.setDaemon(True) # 客户端随服务端退出 t.start() if __name__ == '__main__': main()
""" ftp 文件服务 客户端 c / s 连接 发请求 获取结果 """ from socket import * import sys from time import sleep # 服务端地址 ADDR = ("127.0.0.1", 8888) # 发起请求的所有功能 class FTPClient: def __init__(self, sock): self.sock = sock # 请求文件列表 def do_list(self): self.sock.send(b"LIST") # 发送请求 result = self.sock.recv(128).decode() # 等回复 # 根据不同结果分情况处理 if result == "OK": # 接收文件列表 while True: file = self.sock.recv(1024).decode() if file == '##': break print(file) else: print("文件库为空") # 退出 def do_exit(self): self.sock.send(b"EXIT") self.sock.close() sys.exit("谢谢使用") # 处理上传 def do_put(self, file): # 测一下这个文件是否存在 try: f = open(file, 'rb') except: print("文件不存在") return # 防止file带有文件路径,提取文件名 filename = file.split("/")[-1] data = "STOR " + filename self.sock.send(data.encode()) # 发请求 result = self.sock.recv(128).decode() # 等待回复 if result == 'OK': # 上传文件 读--》发送 while True: data = f.read(1024) if not data: break self.sock.send(data) sleep(0.1) self.sock.send(b"##") f.close() else: print("该文件已存在") # 处理下载 def do_get(self, file): data = "RETR " + file self.sock.send(data.encode()) # 发请求 result = self.sock.recv(128).decode() # 等待回复 if result == 'OK': # 接收文件 f = open(file, 'wb') while True: data = self.sock.recv(1024) if data == b"##": break f.write(data) f.close() else: print("该文件不存在") # 启动函数 def main(): sock = socket() sock.connect(ADDR) # 实例化对象用于调用方法 ftp = FTPClient(sock) while True: print(""" ============ 命令选项 ============= list get file put file exit ================================== """) cmd = input("请输入命令:") if cmd == "list": ftp.do_list() elif cmd == 'exit': ftp.do_exit() elif cmd[:3] == 'put': file = cmd.split(' ')[-1] ftp.do_put(file) elif cmd[:3] == 'get': file = cmd.split(' ')[-1] ftp.do_get(file) else: print("请输入正确命令") if __name__ == '__main__': main()
IO并发模型
IO概述
-
什么是IO
-
程序分类
-
IO密集型程序:在程序执行中有大量IO操作,而运算操作较少。消耗cpu较少,耗时长。
-
计算密集型程序:程序运行中运算较多,IO操作相对较少。cpu消耗多,执行速度快,几乎没有阻塞。
-
-
IO分类:阻塞IO ,非阻塞IO,IO多路复用,异步IO等。
-
定义:在执行IO操作时如果执行条件不满足则阻塞。阻塞IO是IO的默认形态。
-
效率:阻塞IO效率很低。但是由于逻辑简单所以是默认IO行为。
-
阻塞情况
-
因为某种执行条件没有满足造成的函数阻塞 e.g. accept input recv
-
处理IO的时间较长产生的阻塞状态 e.g. 网络传输,大文件读写
-
非阻塞IO
-
定义 :通过修改IO属性行为,使原本阻塞的IO变为非阻塞的状态。
-
设置套接字为非阻塞IO
sockfd.setblocking(bool)
功能:设置套接字为非阻塞IO
参数:默认为True,表示套接字IO阻塞;设置为False则套接字IO变为非阻塞
- 超时检测 :设置一个最长阻塞时间,超过该时间后则不再阻塞等待。
sockfd.settimeout(sec)
功能:设置套接字的超时时间
参数:设置的时间
-
定义
同时监控多个IO事件,当哪个IO事件准备就绪就执行哪个IO事件。以此形成可以同时处理多个IO的行为,避免一个IO阻塞造成其他IO均无法执行,提高了IO执行效率。
-
具体方案
-
select方法 : windows linux unix
-
poll方法: linux unix
-
epoll方法: linux
-
select 方法
rs, ws, xs=select(rlist, wlist, xlist[, timeout])
功能: 监控IO事件,阻塞等待IO发生
参数:rlist 列表 读IO列表,添加等待发生的或者可读的IO事件
wlist 列表 写IO列表,存放要可以主动处理的或者可写的IO事件
xlist 列表 异常IO列表,存放出现异常要处理的IO事件
timeout 超时时间
返回值: rs 列表 rlist中准备就绪的IO
ws 列表 wlist中准备就绪的IO
xs 列表 xlist中准备就绪的IO
""" 基于select的 IO网络并发模型 IO 多路复用与非阻塞搭配 重点代码!!! """ from socket import * from select import select # 网络地址 HOST = "0.0.0.0" PORT = 8888 ADDR = (HOST, PORT) # 创建监听套接字 sockfd = socket() sockfd.bind(ADDR) sockfd.listen(5) # 设置套接字为非阻塞IO sockfd.setblocking(False) # 初始只有监听套接字,先关注他 rlist = [sockfd] # 客户端连接 wlist = [] xlist = [] # 循环(开始)监控IO对象 while True: rs, ws, xs = select(rlist, wlist, xlist) # 处理就绪的IO (不能出现IO阻塞) for r in rs: # 有客户端连接 if r is sockfd: connfd, addr = rs.accept() print("Connect from", addr) # 将客户端连接套接字也监控起来 connfd.setblocking(False) rlist.append(connfd) else: # 某个客户端发消息 connfd 就绪 data = r.recv(1024).decode() # 客户端退出处理 if not data: rlist.remove(r) r.close() continue print(data) # r.send(b"OK") wlist.append(r) # 存入写列表 for w in ws: w.send(b"OK") wlist.remove(w)
poll方法
p = select.poll()
功能 : 创建poll对象
返回值: poll对象
p.register(fd,event) 功能: 注册关注的IO事件 参数:fd 要关注的IO event 要关注的IO事件类型 常用类型:POLLIN 读IO事件(rlist) POLLOUT 写IO事件 (wlist) POLLERR 异常IO (xlist) POLLHUP 断开连接 e.g. p.register(sockfd,POLLIN|POLLERR) p.unregister(fd) 功能:取消对IO的关注 参数:IO对象或者IO对象的fileno
events = p.poll()
功能: 阻塞等待监控的IO事件发生
返回值: 返回发生的IO
events格式 [(fileno,event),()....]
每个元组为一个就绪IO,元组第一项是该IO的fileno,第二项为该IO就绪的事件类型
""" 训练: 使用poll方法完成与select_server相同的功能性代码 """ from socket import * from select import * # 网络地址 HOST = "0.0.0.0" PORT = 8888 ADDR = (HOST, PORT) # 创建监听套接字 sockfd = socket() sockfd.bind(ADDR) sockfd.listen(5) # 设置套接字为非阻塞IO sockfd.setblocking(False) p = poll() # 生成poll对象 # 查找字典 fileno --> io object map = {sockfd.fileno(): sockfd} # 初始监听套接字,先关注他 p.register(sockfd, POLLIN) # 循环监控IO对象 while True: # events --> [(fileno,event),()] events = p.poll() # 遍历events 处理就绪的IO for fd, event in events: # 分类讨论 if fd == sockfd.fileno(): connfd, addr = map[fd].accept() print("Connect from", addr) # 将客户端连接套接字也监控起来 connfd.setblocking(False) p.register(connfd, POLLIN | POLLERR) map[connfd.fileno()] = connfd # 维护字典 elif event == POLLIN: # 某个客户端发消息 connfd 就绪 data = map[fd].recv(1024).decode() # 客户端退出处理 if not data: p.unregister(fd) # 不在关注 map[fd].close() del map[fd] # 从字典删除 continue print(data) map[fd].send(b"OK")
epoll方法
生成对象改为 epoll()
将所有事件类型改为EPOLL类型
-
epoll特点
-
epoll 效率比select poll要高
-
epoll 监控IO数量比select要多
-
-
""" 训练: 使用epoll方法完成与select_server相同的功能性代码 基于epoll的 IO网络模型 IO 多路复用与非阻塞搭配 重点代码!!! """ from socket import * from select import * # 网络地址 HOST = "0.0.0.0" PORT = 8888 ADDR = (HOST, PORT) # 创建监听套接字 sockfd = socket() sockfd.bind(ADDR) sockfd.listen(5) # 设置套接字为非阻塞IO sockfd.setblocking(False) p = epoll() # 生成epoll对象 # 查找字典 fileno --> io object map = {sockfd.fileno(): sockfd} # 初始监听套接字,先关注他 p.register(sockfd, EPOLLIN) # 循环监控IO对象 while True: # events --> [(fileno,event),()] events = p.poll() # 遍历events 处理就绪的IO for fd, event in events: # 分类讨论 if fd == sockfd.fileno(): connfd, addr = map[fd].accept() print("Connect from", addr) # 将客户端连接套接字也监控起来 connfd.setblocking(False) p.register(connfd, EPOLLIN | EPOLLERR) map[connfd.fileno()] = connfd # 维护字典 elif event == EPOLLIN: # 某个客户端发消息 connfd 就绪 data = map[fd].recv(1024).decode() # 客户端退出处理 if not data: p.unregister(fd) # 不在关注 map[fd].close() del map[fd] # 从字典删除 continue print(data) # map[fd].send(b"OK") p.unregister(fd) p.register(fd, EPOLLOUT) elif event == EPOLLOUT: map[fd].send(b"OK") p.unregister(fd) p.register(fd, EPOLLIN)
利用IO多路复用等技术,同时处理多个客户端IO请求。
-
优点 : 资源消耗少,能同时高效处理多个IO行为
-
缺点 : 只针对处理并发产生的IO事件
-
适用情况:HTTP请求,网络传输等都是IO行为,可以通过IO多路复用监控多个客户端的IO请求。
-
并发服务实现过程
【1】将关注的IO准备好,通常设置为非阻塞状态。
【2】通过IO多路复用方法提交,进行IO监控。
【3】阻塞等待,当监控的IO有发生时,结束阻塞。
【4】遍历返回值列表,确定就绪IO事件。
【5】处理发生的IO事件。