How To Use Linux epoll with Python - ani_di的专栏 - 博客频道 - CSDN.NET
原文:http://scotdoyle.com/python-epoll-howto.html?
How To Use Linux epoll with Python
Contents
- Introduction
- Blocking Socket Programming Examples
- Benefits of Asynchronous Sockets and Linux epoll
- Asynchronous Socket Programming Examples with epoll
- Performance Considerations
- Source Code
Introduction
Python2.6已包含了调用Linux epoll库的API。本文使用Python 3的例子简要地描述这个API和使用。欢迎提问和反馈信息。
Blocking Socket Programming Examples
例子1是一个Python 3写的简单服务器,它在8080端口listen HTTP请求,打印消息到控制台,然后发送反馈到客户端。
- Line 9: 创建server socket.
- Line 10: 设置SO_REUSEADDR,这样我们的程序就能与其它进程在监听同一个端口时。
- Line 11: 编写本地所有IPv4地址的8080端口。
- Line 12: 监听此socket上的连接。
- Line 14: 程序会在此阻塞,直接有新的连接过来。调用accept函数会产生此连接的socket对象和请求的地址。
- Lines 15-17: 组合请求所有信息。有关HTTP协议参考 HTTP Made Easy.
- Line 18: 打印请求。
- Line 19: 将消息发送回客户端
- Lines 22-22: 关闭连接到客户端和监听socket。
官方HOWTO 上有更多关于socket的编程细节。
Example 1 (All examples use Python 3)
- import socket
- EOL1 = b'/n/n'
- EOL2 = b'/n/r/n'
- response = b'HTTP/1.0 200 OK/r/nDate: Mon, 1 Jan 1996 01:01:01 GMT/r/n'
- response += b'Content-Type: text/plain/r/nContent-Length: 13/r/n/r/n'
- response += b'Hello, world!'
- serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serversocket.bind(('0.0.0.0', 8080))
- serversocket.listen(1)
- connectiontoclient, address = serversocket.accept()
- request = b''
- while EOL1 not in request and EOL2 not in request:
- request += connectiontoclient.recv(1024)
- print(request.decode())
- connectiontoclient.send(response)
- connectiontoclient.close()
- 22 serversocket.close()
Example 2 在第15行处加上循环,它将处理掺的连接直接用户中断。在此可能清楚地看到,server socket是从来不用做与客户端交换数据,而是为连接创建新的socket,由该新socket交换数据。
23-24的 finally 语句用于砍server socket总是关闭,即使有异常发生(比较用户中断)。
Example 2
- import socket
- EOL1 = b'/n/n'
- EOL2 = b'/n/r/n'
- response = b'HTTP/1.0 200 OK/r/nDate: Mon, 1 Jan 1996 01:01:01 GMT/r/n'
- response += b'Content-Type: text/plain/r/nContent-Length: 13/r/n/r/n'
- response += b'Hello, world!'
- serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serversocket.bind(('0.0.0.0', 8080))
- serversocket.listen(1)
- try:
- while True:
- connectiontoclient, address = serversocket.accept()
- request = b''
- while EOL1 not in request and EOL2 not in request:
- request += connectiontoclient.recv(1024)
- print('-'*40 + '/n' + request.decode()[:-2])
- connectiontoclient.send(response)
- connectiontoclient.close()
- finally:
- serversocket.close(
Benefits of Asynchronous Sockets and Linux epoll
Example 2 中的socket被称为blocking sockets。16行的 accept()调用会阻塞直接客户端有新的连接 。19行的recv()亦会阻塞直到接收满缓冲(或者数据已发送完毕)。21行的send()操作同样阻塞直接消息进入发送队列中。
当使用 blocking sockets服务器模型时,通常会将每一个连接放在单独的线程(进程也可以)中处理。主程序只负责监听连接。它每次只接受一个请求,然后把新创建的socket分发到子线程中。于是所有的阻塞将分发到子线程中,不同客户之间的连接互不影响。
blocking sockets非常直观且易于理解,但是它有很多 缺陷。在多线程间协调资源非常困难,并且这种模型在单核CPU上不是很高效。
The C10K Problem 列出了一些处理并发socket的方法。其中之一就是 asynchronous sockets。这种 sockets 不会阻塞自己以等待事件发生,相反,当操作成功或失败后程序将会收到通知。通知中的信息将有助于决定下一步该如何处理。采用asynchronous sockets 的服务器可以用单线程来处理并发连接,而在访问阻塞资源时采用多线程,比如数据库。
Linux 2.6 有许多方法来管理synchronous sockets,其中三个已经导出到Python API中,它们是 select, poll 和 epoll。epoll 和 poll 比 select 更好,因为select方法是需要手动检查每人socket是否有感兴趣事件发生,而前者是操作系统会在这些事件发生时通知应用程序。并且 epoll 要比 poll 更好,它由程序自己发出请求,Linux记录并返回一个队列表示那些socket有事件发生,这比每有一个事件发生都由操作系统通知一遍要好得多。所以 epoll 对于大量(上千)并发连接更有效,扩展也相对容易。 these graphs
Asynchronous Socket Programming Examples with epoll
下面程序展示了使用 epoll 的一般流程:
- 创建 epoll 对象
- 通知 epoll 对象监听特定sockets上的特定消息
- 询问自上一次询问前,有那些socket收到了指定的消息
- 处理这些socket
- 通知 epoll 对象修改监听sockets或消息类型
- 重复3到5操作,直接结束
- 销毁 epoll 对象
Example 3 是 Example 2 的asynchronous sockets版本。它会更原来更复杂。
- Line 1: 导入select模块,epoll调用包含在此模块中
- Line 13: sockets默认为blocking,需要设置为non-blocking (asynchronous) 模式
- Line 15: 创建 epoll 对象
- Line 16: 注册 server socket上的关于读可用的消息(EPOLLIN -- Available for read)。
- Line 19: 建立一个connection字典,将 file descriptors (integers) 映射到网络连接对象。
- Line 21: 查询 epoll 对象是否有事件发生,参数”1“表示超时时间1秒。如果有任何事件发生,该函数会立即返回一个列表。
- Line 22: 事件列表是里包含fileno和event code,fileno等于文件描述符(在整个系统中,它是独立的)。
- Line 26: 注册新socket上的读可用消息。
- Line 31: 如果可读消息来自于新socket发生的数据
- Line 33: 当读操作完成后,修改注册到此socket的消息,改为写准备好事件,用于后面的回写操作。
- Line 34: 打印读到的信息。
- Line 35: 如果写新socket操作准备好
- Lines 36-38: 发送数据给客户端。
- Line 39: 发送完毕,不再监视此socket上的任何消息
- Line 40: shutdown消息,通知双方结束了。
- Line 41: 收到 HUP (hang-up) 事件表示连接中断。HUP事件不需要注册,epoll对象都会处理这个消息。
- Line 42: 注销丢失连接的
- Line 43: 关闭socket connection.
- Lines 18-45: 保证epoll对象能正确关闭
- Lines 46-48: 关闭server socket. (Python在程序结束时会自动关闭所有打开的文件/socket)
Example 3
- import socket, select
- EOL1 = b'/n/n'
- EOL2 = b'/n/r/n'
- response = b'HTTP/1.0 200 OK/r/nDate: Mon, 1 Jan 1996 01:01:01 GMT/r/n'
- response += b'Content-Type: text/plain/r/nContent-Length: 13/r/n/r/n'
- response += b'Hello, world!'
- serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serversocket.bind(('0.0.0.0', 8080))
- serversocket.listen(1)
- serversocket.setblocking(0)
- epoll = select.epoll()
- epoll.register(serversocket.fileno(), select.EPOLLIN)
- try:
- connections = {}; requests = {}; responses = {}
- while True:
- events = epoll.poll(1)
- for fileno, event in events:
- if fileno == serversocket.fileno():
- connection, address = serversocket.accept()
- connection.setblocking(0)
- epoll.register(connection.fileno(), select.EPOLLIN)
- connections[connection.fileno()] = connection
- requests[connection.fileno()] = b''
- responses[connection.fileno()] = response
- elif event & select.EPOLLIN:
- requests[fileno] += connections[fileno].recv(1024)
- if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
- epoll.modify(fileno, select.EPOLLOUT)
- print('-'*40 + '/n' + requests[fileno].decode()[:-2])
- elif event & select.EPOLLOUT:
- byteswritten = connections[fileno].send(responses[fileno])
- responses[fileno] = responses[fileno][byteswritten:]
- if len(responses[fileno]) == 0:
- epoll.modify(fileno, 0)
- connections[fileno].shutdown(socket.SHUT_RDWR)
- elif event & select.EPOLLHUP:
- epoll.unregister(fileno)
- connections[fileno].close()
- del connections[fileno]
- finally:
- epoll.unregister(serversocket.fileno())
- epoll.close()
- serversocket.close()
epoll 有两种模式,edge-triggered (边缘触发)和 level-triggered (电平触发). edge-triggered 下某个socket上的读写事件只会在epoll.poll()调用中返回一次。 所以程序要处理完该socket上的所有数据,下次poll()调用都会有响应。 When the data from a particular event is exhausted, additional attempts to operate on the socket will cause an exception. 相反,level-triggered 会一起重复通知直到所有的数据都处理完。
举例来说,假设一个server socket注册了EPOLLIN事件,edge-triggered模式下,程序需要一直调用accept()直到socket.error异常
发生为止。而level-triggered下可以按例子3那样做。
例子 3 正是使用level-triggered模式,并且也是默认的模式。例子4将展示edge-triggered的使用方法。第25,36和45新增了一些循环,它们都需要一直重复直到异常发生。最好,第16,28,41和51行是切换到edge-triggered的方法。
Example 4
- import socket, select
- EOL1 = b'/n/n'
- EOL2 = b'/n/r/n'
- response = b'HTTP/1.0 200 OK/r/nDate: Mon, 1 Jan 1996 01:01:01 GMT/r/n'
- response += b'Content-Type: text/plain/r/nContent-Length: 13/r/n/r/n'
- response += b'Hello, world!'
- serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serversocket.bind(('0.0.0.0', 8080))
- serversocket.listen(1)
- serversocket.setblocking(0)
- epoll = select.epoll()
- epoll.register(serversocket.fileno(), select.EPOLLIN | select.EPOLLET)
- try:
- connections = {}; requests = {}; responses = {}
- while True:
- events = epoll.poll(1)
- for fileno, event in events:
- if fileno == serversocket.fileno():
- try:
- while True:
- connection, address = serversocket.accept()
- connection.setblocking(0)
- epoll.register(connection.fileno(), select.EPOLLIN | select.EPOLLET)
- connections[connection.fileno()] = connection
- requests[connection.fileno()] = b''
- responses[connection.fileno()] = response
- except socket.error:
- pass
- elif event & select.EPOLLIN:
- try:
- while True:
- requests[fileno] += connections[fileno].recv(1024)
- except socket.error:
- pass
- if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
- epoll.modify(fileno, select.EPOLLOUT | select.EPOLLET)
- print('-'*40 + '/n' + requests[fileno].decode()[:-2])
- elif event & select.EPOLLOUT:
- try:
- while len(responses[fileno]) > 0:
- byteswritten = connections[fileno].send(responses[fileno])
- responses[fileno] = responses[fileno][byteswritten:]
- except socket.error:
- pass
- if len(responses[fileno]) == 0:
- epoll.modify(fileno, select.EPOLLET)
- connections[fileno].shutdown(socket.SHUT_RDWR)
- elif event & select.EPOLLHUP:
- epoll.unregister(fileno)
- connections[fileno].close()
- del connections[fileno]
- finally:
- epoll.unregister(serversocket.fileno())
- epoll.close()
- serversocket.close()
level-triggered 通常用来移植以前用 select 或 poll 模型的程序,而edge-triggered 模式主要用在当程序员不想让操作系统协助管理事件状态的情形。除了这两种操作外,socket还可以注册epoll时使用EPOLLONESHOT标记。有此标记的socket只会在第一次epoll.poll()成功有效,然后自动从监听队列中移除。Performance Considerations
Listen Backlog Queue Size
在例子 1-4中,第12行有一条listen()调用。传入的参数是监听队列的大小,它告诉操作系统允许多少没有被accept的TCP/IP连接可以放在监听队列中。每次调用accept(), 监听队列就会突出一个位置用于下个连接;如果队列已满,新的连接将直接忽略,以便不让客户端产生不必要的等待。真实场景只服务器会有成百上千的连接,所以这时传入1并不合适。例如,当使用 ab 做性能测试时,任何小于50的队列都会对性能产生影响。
TCP Options
TCP_CORK 选项可以等到消息装满(bottle up)后再发送。例子5的34和40行展示了其用法,可能对于使用HTTP/1.1管道的服务器有用
Example 5
- import socket, select
- EOL1 = b'/n/n'
- EOL2 = b'/n/r/n'
- response = b'HTTP/1.0 200 OK/r/nDate: Mon, 1 Jan 1996 01:01:01 GMT/r/n'
- response += b'Content-Type: text/plain/r/nContent-Length: 13/r/n/r/n'
- response += b'Hello, world!'
- serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serversocket.bind(('0.0.0.0', 8080))
- serversocket.listen(1)
- serversocket.setblocking(0)
- epoll = select.epoll()
- epoll.register(serversocket.fileno(), select.EPOLLIN)
- try:
- connections = {}; requests = {}; responses = {}
- while True:
- events = epoll.poll(1)
- for fileno, event in events:
- if fileno == serversocket.fileno():
- connection, address = serversocket.accept()
- connection.setblocking(0)
- epoll.register(connection.fileno(), select.EPOLLIN)
- connections[connection.fileno()] = connection
- requests[connection.fileno()] = b''
- responses[connection.fileno()] = response
- elif event & select.EPOLLIN:
- requests[fileno] += connections[fileno].recv(1024)
- if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
- epoll.modify(fileno, select.EPOLLOUT)
- connections[fileno].setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 1)
- print('-'*40 + '/n' + requests[fileno].decode()[:-2])
- elif event & select.EPOLLOUT:
- byteswritten = connections[fileno].send(responses[fileno])
- responses[fileno] = responses[fileno][byteswritten:]
- if len(responses[fileno]) == 0:
- connections[fileno].setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 0)
- epoll.modify(fileno, 0)
- connections[fileno].shutdown(socket.SHUT_RDWR)
- elif event & select.EPOLLHUP:
- epoll.unregister(fileno)
- connections[fileno].close()
- del connections[fileno]
- finally:
- epoll.unregister(serversocket.fileno())
- epoll.close()
- serversocket.close()
相反, TCP_NODELAY 选项则通知操作系统,当调用socket.send()时数据应该立即发送。例子6中的14行展示了这种用法,像SSH或其它“实时”性程序可能会用到此选项。Example 6
- import socket, select
- EOL1 = b'/n/n'
- EOL2 = b'/n/r/n'
- response = b'HTTP/1.0 200 OK/r/nDate: Mon, 1 Jan 1996 01:01:01 GMT/r/n'
- response += b'Content-Type: text/plain/r/nContent-Length: 13/r/n/r/n'
- response += b'Hello, world!'
- serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serversocket.bind(('0.0.0.0', 8080))
- serversocket.listen(1)
- serversocket.setblocking(0)
- serversocket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
- epoll = select.epoll()
- epoll.register(serversocket.fileno(), select.EPOLLIN)
- ry:
- connections = {}; requests = {}; responses = {}
- while True:
- events = epoll.poll(1)
- for fileno, event in events:
- if fileno == serversocket.fileno():
- connection, address = serversocket.accept()
- connection.setblocking(0)
- epoll.register(connection.fileno(), select.EPOLLIN)
- connections[connection.fileno()] = connection
- requests[connection.fileno()] = b''
- responses[connection.fileno()] = response
- elif event & select.EPOLLIN:
- requests[fileno] += connections[fileno].recv(1024)
- if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
- epoll.modify(fileno, select.EPOLLOUT)
- print('-'*40 + '/n' + requests[fileno].decode()[:-2])
- elif event & select.EPOLLOUT:
- byteswritten = connections[fileno].send(responses[fileno])
- responses[fileno] = responses[fileno][byteswritten:]
- if len(responses[fileno]) == 0:
- epoll.modify(fileno, 0)
- connections[fileno].shutdown(socket.SHUT_RDWR)
- elif event & select.EPOLLHUP:
- epoll.unregister(fileno)
- connections[fileno].close()
- del connections[fileno]
- finally:
- epoll.unregister(serversocket.fileno())
- epoll.close()
- serversocket.close()
Source Code
The examples on this page are in the public domain and available for download.