I/O多路复用概念
通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。
Python中有一个select模块,其中提供了:select 、poll 、epoll三个方法,分别调用系统的select、poll、epoll从而实现IO多路复用
Windows Python:
提供:select
Mac Python:
提供:select
Linux Python:
提供:select poll epoll
注意:网络操作、文件操作、终端操作等均属于IO操作
select实例一:终端操作
select监听文件句柄,当文件句柄序列发生变化,select调用系统接口得到变化
import select import sys while True: # 当用户输入内容时,stdin会发生改变,select循环检测,检测到改变后会将内容放到readable(读列表中)中 # readable 读列表 writeable写列表 error 错误列表 # timeout == 1 # 当用户没有输入的时候,列表就是空的一直 readable,writeable,error = select.select([sys.stdin,],[],[],1) if sys.stdin in readable: print "select get stdin",sys.stdin.readline()
select实例二:服务端操作
#!/usr/bin/env python # coding:utf-8 import socket import select ip_port = ('127.0.0.1',8888) sk = socket.socket() sk.bind((ip_port)) sk.listen(5) # 设置不阻塞,服务端不需要一直在等待客户端的输入 sk.setblocking(False) sk1 = socket.socket() sk1.bind(('127.0.0.1',9999)) sk1.listen(5) # 是否阻塞(默认True),如果设置False,那么accept和recv时一旦无数据,则报错。 sk1.setblocking(False) while True: R_list,w_list,E_list = select.select([sk,sk1],[],[],2) for r in R_list: # conn是客户端的句柄,addr是客户端的地址
conn,addr = r.accept() print addr
select实例三:通过select实现处理多请求
server端代码:
#!/usr/bin/env python # coding:utf-8 import socket import select import time ip_port = ('127.0.0.1',8888) sk = socket.socket() sk.bind(ip_port) sk.listen(5) sk.setblocking(False) inputs = [sk] while True: R_list,w_list,E_list = select.select(inputs,[],[],2) time.sleep(2) print "input:",inputs print "result",R_list for r in R_list: if r == sk: conn,addr = r.accept() inputs.append(conn) print addr else: client_data = r.recv(1024) r.sendall(client_data)
client端代码:
#!/usr/bin/env python # coding:utf-8 import socket ip_port= ('127.0.0.1',8888) sk = socket.socket() sk.connect(ip_port) sk.settimeout(5) while True: inp = raw_input('please input:') sk.sendall(inp) print sk.recv(1024) sk.close()
执行结果
# 没有客户端连接的时候,只监听着服务端 input: [<socket._socketobject object at 0x020E2B20>] result [] input: [<socket._socketobject object at 0x020E2B20>] # 当有客户端接入的时候 result [<socket._socketobject object at 0x020E2B20>] ('127.0.0.1', 62692) # 多了client的文件句柄 input: [<socket._socketobject object at 0x020E2B20>, <socket._socketobject object at 0x020E2B58>] result [] input: [<socket._socketobject object at 0x020E2B20>, <socket._socketobject object at 0x020E2B58>] result []
select实例四:使用后面三个参数,同时当client端失去连接的时候,从连接中删除
#!/usr/bin/env python # coding:utf-8 import socket import select import time ip_port = ('127.0.0.1',8888) sk = socket.socket() sk.bind(ip_port) sk.listen(5) sk.setblocking(False) inputs = [sk] output = [] while True: # inputs:只有变化了,R_list才接收到,不变化不接收 # output:只要存在,w_list就一直接收 # inputs:如果有报错,会接收到E_list中 # 2: timeout 超时时间,超过两秒就不阻塞,执行下面的内容;如果不加时间,没有client端连接进来,他就一直堵塞 R_list,w_list,E_list = select.select(inputs,output,inputs,2) time.sleep(2) print "input:",inputs for r in R_list: if r == sk: conn,addr = r.accept() inputs.append(conn) print addr else: # 判断客户端是否存活,如果不存活就剔除 client_data = r.recv(1024) if client_data: r.sendall(client_data) else: inputs.remove(r)
select实例六:output讲解,与客户端接收发送数据
#!/usr/bin/env python # coding:utf-8 import socket import select import time ip_port = ('127.0.0.1',8888) sk = socket.socket() sk.bind(ip_port) sk.listen(5) inputs = [sk] output = [] while True: # inputs:只有变化了,R_list才接收到,不变化不接收 # output:只要存在,w_list就一直接收 # inputs:如果有报错,会接收到E_list中 # 2: timeout 超时时间,超过两秒就不阻塞,执行下面的内容 R_list,W_list,E_list = select.select(inputs,output,inputs,2) print "output:",output for r in R_list: if r == sk: conn,addr = r.accept() inputs.append(conn) else: # 判断客户端是否存活,如果不存活就剔除 client_data = r.recv(1024) if client_data: # 获取数据 output.append(r) for w in W_list: w.sendall('123456') output.remove(w)
Queue
简单操作
#!/usr/bin/env python # coding:utf-8 import Queue # 定义一个对象 obj = Queue.Queue() # 打印对象队列的大小 print obj.qsize() # 向队列中添加数据 obj.put(1) print obj.qsize() obj.put(2) print obj.qsize() # 获取值(FIFO) print obj.get() # 定义队列的长度 obj = Queue.Queue(3) obj.put(1) obj.put(2) obj.put(3) print obj.qsize() # 队列满了后,不等待直接添加会报异常 obj.put_nowait(123) print obj.qsize() obj = Queue.Queue() # 队列中没有数据,会一直等待 #obj.get() # 不等待 # 队列中没有数据,会触发异常, obj.get_nowait() # 使用异常处理 try: obj.get_nowait() except Queue.Empty: print 'error'
使用队列来实现
#!/usr/bin/env python # coding:utf-8 import socket import select import Queue ip_port = ('127.0.0.1',8888) sk = socket.socket() sk.bind(ip_port) sk.listen(5) inputs = [sk] output = [] message = {} # message = { # 'c1':队列, # 'c2':队列,【b,bb,bbb】 # } while True: # inputs:只有变化了,R_list才接收到,不变化不接收 # output:只要存在,w_list就一直接收 # inputs:如果有报错,会接收到E_list中 # 2: timeout 超时时间,超过两秒就不阻塞,执行下面的内容 R_list,W_list,E_list = select.select(inputs,output,inputs,2) print "output:",output for r in R_list: if r == sk: conn,addr = r.accept() inputs.append(conn) message[conn] = Queue.Queue() else: # 判断客户端是否存活,如果不存活就剔除 client_data = r.recv(1024) if client_data: # 获取数据 output.append(r) # 在指定的队列中插入数据 message[r].put(client_data) else: inputs.remove(r) del message[r] for w in W_list: # 去队列中去数据 try: data = message[w].get_nowait() w.sendall(data) except Queue.Empty: pass output.remove(w)
此处的Socket服务器端相比原生的Socket,他支持当某一个请求不再发送数据时,服务器端不会等待而是可以去处理其他的请求数据,但是,如果每个请求的耗时比较长时,select版本的服务器端也无法完成同时操作
SockertServer
SocketServer内部使用IO多路复用,以及多线程和多进程,从而实现并发处理多个客户端请求的Socket服务端,即:每个客户端请求连接到服务器时,Socket服务器端都会在服务器上创建一个线程或者进程,专门负责当前客户端的所有请求
计算密集型用进程,IO密集型用线程
ThreadingTCPServer进程
ThreadingTCPServer实现的Socket服务器内部会为每个client端创建一个线程,该线程用来和客户端进行交互
1: ThreadingTCPServer 基础
使用ThreadingTCPServer :
- 创建一个继承自SocketServer.BaseRequestHandler的类
- 类中必须定义一个名称为handler的方法
- 启动ThreadingTCPServer
服务器端:
#!/usr/bin/env python # coding:utf-8 import SocketServer class MyServer(SocketServer.BaseRequestHandler): # 必须有一个handle方法 def handle(self): print self.request,self.client_address,self.server conn = self.request conn.sendall('wlecome ........') Flag = True while Flag: data = conn.recv(1024) if data == 'exit': Flag = False elif data == '0': conn.sendall('2345678') else: conn.sendall('input:') if __name__ == '__main__': server = SocketServer.ThreadingTCPServer(('127.0.0.1'),MyServer) server.serve_forever()
客户端:
#!/usr/bin/env python # coding:utf-8 import socket ip_port = ('127.0.0.1',8009) sk = socket.socket sk.connect(ip_port) sk.settimeout(5) while True: data = sk.recv(1024) print 'receive:',data inp = raw_input('please input') sk.sendall(inp) if inp == 'exit': break sk.close()
twisted 事件驱动
Twisted是一个事件驱动的网络框架,其中包含了诸多功能,例如:网络协议、线程、数据库管理、网络操作、电子邮件等。
事件驱动:分为两部分,第一是注册事件,第二是触发事件
自定义一个事件驱动框架
#!/usr/bin/env python # -*- coding:utf-8 -*- # event_drive.py event_list = [] def run(): for event in event_list: obj = event() obj.execute() class BaseHandler(object): """ 用户必须继承该类,从而规范所有类的方法(类似于接口的功能) """ def execute(self): raise Exception('you must overwrite execute')
创建一个python Package,
将这个文件夹放在C:Python27Libsite-packages
使用上面的事件驱动框架
#!/usr/bin/env python # -*- coding:utf-8 -*- from source import event_drive class MyHandler(event_drive.BaseHandler): def execute(self): print 'event-drive execute MyHandler' # 注册一个事件,把MyHandler这个类当作一个事件,注册到event_list列表中 event_drive.event_list.append(MyHandler) event_drive.run()
Twisted-15.5.0的安装
- 解压文件
- 进入到解压的文件的目录中
- 执行编译:python setup.py build
- 执行安装:python setup.py install
python源码安装都是上面的流程