zoukankan      html  css  js  c++  java
  • How To Use Linux epoll with Python

    How To Use Linux epoll with Python - ani_di的专栏 - 博客频道 - CSDN.NET

    How To Use Linux epoll with Python

    371人阅读 评论(0) 收藏 举报

    原文:http://scotdoyle.com/python-epoll-howto.html?

    How To Use Linux epoll with Python

    Contents

    Introduction

    Python2.6已包含了调用Linux epoll库的API。本文使用Python 3的例子简要地描述这个API和使用。欢迎提问和反馈信息。

    Blocking Socket Programming Examples

    例子1是一个Python 3写的简单服务器,它在8080端口listen HTTP请求,打印消息到控制台,然后发送反馈到客户端。

    • Line 9: 创建server socket.
    • Line 10: 设置SO_REUSEADDR,这样我们的程序就能与其它进程在监听同一个端口时。
    • Line 11: 编写本地所有IPv4地址的8080端口。
    • Line 12: 监听此socket上的连接。
    • Line 14: 程序会在此阻塞,直接有新的连接过来。调用accept函数会产生此连接的socket对象和请求的地址。
    • Lines 15-17: 组合请求所有信息。有关HTTP协议参考 HTTP Made Easy.
    • Line 18: 打印请求。
    • Line 19: 将消息发送回客户端
    • Lines 22-22: 关闭连接到客户端和监听socket。

    官方HOWTO 上有更多关于socket的编程细节。

    Example 1 (All examples use Python 3)

    [python:showcolumns] view plaincopyprint?
    ·········10········20········30········40········50········60········70········80········90········100·······110·······120·······130·······140·······150
    1. import socket  
    2. EOL1 = b'/n/n'  
    3. EOL2 = b'/n/r/n'  
    4. response  = b'HTTP/1.0 200 OK/r/nDate: Mon, 1 Jan 1996 01:01:01 GMT/r/n'  
    5. response += b'Content-Type: text/plain/r/nContent-Length: 13/r/n/r/n'  
    6. response += b'Hello, world!'  
    7. serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
    8. serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  
    9. serversocket.bind(('0.0.0.0'8080))  
    10. serversocket.listen(1)  
    11. connectiontoclient, address = serversocket.accept()  
    12. request = b''  
    13. while EOL1 not in request and EOL2 not in request:  
    14.    request += connectiontoclient.recv(1024)  
    15. print(request.decode())  
    16. connectiontoclient.send(response)  
    17. connectiontoclient.close()  
    18. 22  serversocket.close()  

    Example 2 在第15行处加上循环,它将处理掺的连接直接用户中断。在此可能清楚地看到,server socket是从来不用做与客户端交换数据,而是为连接创建新的socket,由该新socket交换数据。

    23-24的 finally 语句用于砍server socket总是关闭,即使有异常发生(比较用户中断)。

    Example 2
     
    [python:showcolumns] view plaincopyprint?
    ·········10········20········30········40········50········60········70········80········90········100·······110·······120·······130·······140·······150
    1. import socket  
    2.   
    3. EOL1 = b'/n/n'  
    4. EOL2 = b'/n/r/n'  
    5. response  = b'HTTP/1.0 200 OK/r/nDate: Mon, 1 Jan 1996 01:01:01 GMT/r/n'  
    6. response += b'Content-Type: text/plain/r/nContent-Length: 13/r/n/r/n'  
    7. response += b'Hello, world!'  
    8.   
    9. serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
    10. serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  
    11. serversocket.bind(('0.0.0.0'8080))  
    12. serversocket.listen(1)  
    13.   
    14. try:  
    15.    while True:  
    16.       connectiontoclient, address = serversocket.accept()  
    17.       request = b''  
    18.       while EOL1 not in request and EOL2 not in request:  
    19.           request += connectiontoclient.recv(1024)  
    20.       print('-'*40 + '/n' + request.decode()[:-2])  
    21.       connectiontoclient.send(response)  
    22.       connectiontoclient.close()  
    23. finally:  
    24.    serversocket.close(  

    Benefits of Asynchronous Sockets and Linux epoll

    Example 2 中的socket被称为blocking sockets。16行的 accept()调用会阻塞直接客户端有新的连接 。19行的recv()亦会阻塞直到接收满缓冲(或者数据已发送完毕)。21行的send()操作同样阻塞直接消息进入发送队列中。

    当使用 blocking sockets服务器模型时,通常会将每一个连接放在单独的线程(进程也可以)中处理。主程序只负责监听连接。它每次只接受一个请求,然后把新创建的socket分发到子线程中。于是所有的阻塞将分发到子线程中,不同客户之间的连接互不影响。

    blocking sockets非常直观且易于理解,但是它有很多 缺陷。在多线程间协调资源非常困难,并且这种模型在单核CPU上不是很高效。

    The C10K Problem 列出了一些处理并发socket的方法。其中之一就是 asynchronous sockets。这种 sockets 不会阻塞自己以等待事件发生,相反,当操作成功或失败后程序将会收到通知。通知中的信息将有助于决定下一步该如何处理。采用asynchronous sockets 的服务器可以用单线程来处理并发连接,而在访问阻塞资源时采用多线程,比如数据库。

    Linux 2.6 有许多方法来管理synchronous sockets,其中三个已经导出到Python API中,它们是 select, poll 和 epoll。epoll 和 poll 比 select 更好,因为select方法是需要手动检查每人socket是否有感兴趣事件发生,而前者是操作系统会在这些事件发生时通知应用程序。并且 epoll 要比 poll 更好,它由程序自己发出请求,Linux记录并返回一个队列表示那些socket有事件发生,这比每有一个事件发生都由操作系统通知一遍要好得多。所以 epoll 对于大量(上千)并发连接更有效,扩展也相对容易。 these graphs

    Asynchronous Socket Programming Examples with epoll

    下面程序展示了使用 epoll 的一般流程:

    1. 创建 epoll 对象
    2. 通知 epoll 对象监听特定sockets上的特定消息
    3. 询问自上一次询问前,有那些socket收到了指定的消息
    4. 处理这些socket
    5. 通知 epoll 对象修改监听sockets或消息类型
    6. 重复3到5操作,直接结束
    7. 销毁 epoll 对象

    Example 3 是 Example 2 的asynchronous sockets版本。它会更原来更复杂。

    • Line 1: 导入select模块,epoll调用包含在此模块中
    • Line 13: sockets默认为blocking,需要设置为non-blocking (asynchronous) 模式
    • Line 15: 创建 epoll 对象
    • Line 16: 注册 server socket上的关于读可用的消息(EPOLLIN -- Available for read)。
    • Line 19: 建立一个connection字典,将 file descriptors (integers) 映射到网络连接对象。
    • Line 21: 查询 epoll 对象是否有事件发生,参数”1“表示超时时间1秒。如果有任何事件发生,该函数会立即返回一个列表。
    • Line 22: 事件列表是里包含fileno和event code,fileno等于文件描述符(在整个系统中,它是独立的)。
    • Line 26: 注册新socket上的读可用消息。
    • Line 31: 如果可读消息来自于新socket发生的数据
    • Line 33: 当读操作完成后,修改注册到此socket的消息,改为写准备好事件,用于后面的回写操作。
    • Line 34: 打印读到的信息。
    • Line 35: 如果写新socket操作准备好
    • Lines 36-38: 发送数据给客户端。
    • Line 39: 发送完毕,不再监视此socket上的任何消息
    • Line 40: shutdown消息,通知双方结束了。
    • Line 41: 收到 HUP (hang-up) 事件表示连接中断。HUP事件不需要注册,epoll对象都会处理这个消息。
    • Line 42: 注销丢失连接的
    • Line 43: 关闭socket connection.
    • Lines 18-45: 保证epoll对象能正确关闭
    • Lines 46-48: 关闭server socket. (Python在程序结束时会自动关闭所有打开的文件/socket)
    Example 3
     
    [python:showcolumns] view plaincopyprint?
    ·········10········20········30········40········50········60········70········80········90········100·······110·······120·······130·······140·······150
    1. import socket, select   
    2.   
    3. EOL1 = b'/n/n'   
    4. EOL2 = b'/n/r/n'   
    5. response  = b'HTTP/1.0 200 OK/r/nDate: Mon, 1 Jan 1996 01:01:01 GMT/r/n'   
    6. response += b'Content-Type: text/plain/r/nContent-Length: 13/r/n/r/n'   
    7. response += b'Hello, world!'   
    8.   
    9. serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)   
    10. serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)   
    11. serversocket.bind(('0.0.0.0'8080))   
    12. serversocket.listen(1)   
    13. serversocket.setblocking(0)   
    14.   
    15. epoll = select.epoll()   
    16. epoll.register(serversocket.fileno(), select.EPOLLIN)   
    17.   
    18. try:   
    19.    connections = {}; requests = {}; responses = {}   
    20.    while True:   
    21.       events = epoll.poll(1)   
    22.       for fileno, event in events:   
    23.          if fileno == serversocket.fileno():   
    24.             connection, address = serversocket.accept()   
    25.             connection.setblocking(0)   
    26.             epoll.register(connection.fileno(), select.EPOLLIN)   
    27.             connections[connection.fileno()] = connection   
    28.             requests[connection.fileno()] = b''   
    29.             responses[connection.fileno()] = response   
    30.          elif event & select.EPOLLIN:   
    31.             requests[fileno] += connections[fileno].recv(1024)   
    32.             if EOL1 in requests[fileno] or EOL2 in requests[fileno]:   
    33.                epoll.modify(fileno, select.EPOLLOUT)   
    34.                print('-'*40 + '/n' + requests[fileno].decode()[:-2])   
    35.          elif event & select.EPOLLOUT:   
    36.             byteswritten = connections[fileno].send(responses[fileno])   
    37.             responses[fileno] = responses[fileno][byteswritten:]   
    38.             if len(responses[fileno]) == 0:   
    39.                epoll.modify(fileno, 0)   
    40.                connections[fileno].shutdown(socket.SHUT_RDWR)   
    41.          elif event & select.EPOLLHUP:   
    42.             epoll.unregister(fileno)   
    43.             connections[fileno].close()   
    44.             del connections[fileno]   
    45. finally:   
    46.    epoll.unregister(serversocket.fileno())   
    47.    epoll.close()   
    48.    serversocket.close()   
     epoll 有两种模式,edge-triggered (边缘触发)level-triggered (电平触发). edge-triggered 下某个socket上的读写事件只会在epoll.poll()调用中返回一次。 所以程序要处理完该socket上的所有数据,下次poll()调用都会有响应。 When the data from a particular event is exhausted, additional attempts to operate on the socket will cause an exception. 相反,level-triggered 会一起重复通知直到所有的数据都处理完。

    举例来说,假设一个server socket注册了EPOLLIN事件,edge-triggered模式下,程序需要一直调用accept()直到socket.error异常

    发生为止。而level-triggered下可以按例子3那样做。

    例子 3 正是使用level-triggered模式,并且也是默认的模式。例子4将展示edge-triggered的使用方法。第25,36和45新增了一些循环,它们都需要一直重复直到异常发生。最好,第16,28,41和51行是切换到edge-triggered的方法。

    Example 4
    [python:showcolumns] view plaincopyprint?
    ·········10········20········30········40········50········60········70········80········90········100·······110·······120·······130·······140·······150
    1. import socket, select  
    2.   
    3. EOL1 = b'/n/n'  
    4. EOL2 = b'/n/r/n'  
    5. response  = b'HTTP/1.0 200 OK/r/nDate: Mon, 1 Jan 1996 01:01:01 GMT/r/n'  
    6. response += b'Content-Type: text/plain/r/nContent-Length: 13/r/n/r/n'  
    7. response += b'Hello, world!'  
    8.   
    9. serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
    10. serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  
    11. serversocket.bind(('0.0.0.0'8080))  
    12. serversocket.listen(1)  
    13. serversocket.setblocking(0)  
    14.   
    15. epoll = select.epoll()  
    16. epoll.register(serversocket.fileno(), select.EPOLLIN | select.EPOLLET)  
    17.   
    18. try:  
    19.    connections = {}; requests = {}; responses = {}  
    20.    while True:  
    21.       events = epoll.poll(1)  
    22.       for fileno, event in events:  
    23.          if fileno == serversocket.fileno():  
    24.             try:  
    25.                while True:  
    26.                   connection, address = serversocket.accept()  
    27.                   connection.setblocking(0)  
    28.                   epoll.register(connection.fileno(), select.EPOLLIN | select.EPOLLET)  
    29.                   connections[connection.fileno()] = connection  
    30.                   requests[connection.fileno()] = b''  
    31.                   responses[connection.fileno()] = response  
    32.             except socket.error:  
    33.                pass  
    34.          elif event & select.EPOLLIN:  
    35.             try:  
    36.                while True:  
    37.                   requests[fileno] += connections[fileno].recv(1024)  
    38.             except socket.error:  
    39.                pass  
    40.             if EOL1 in requests[fileno] or EOL2 in requests[fileno]:  
    41.                epoll.modify(fileno, select.EPOLLOUT | select.EPOLLET)  
    42.                print('-'*40 + '/n' + requests[fileno].decode()[:-2])  
    43.          elif event & select.EPOLLOUT:  
    44.             try:  
    45.                while len(responses[fileno]) > 0:  
    46.                   byteswritten = connections[fileno].send(responses[fileno])  
    47.                   responses[fileno] = responses[fileno][byteswritten:]  
    48.             except socket.error:  
    49.                pass  
    50.             if len(responses[fileno]) == 0:  
    51.                epoll.modify(fileno, select.EPOLLET)  
    52.                connections[fileno].shutdown(socket.SHUT_RDWR)  
    53.          elif event & select.EPOLLHUP:  
    54.             epoll.unregister(fileno)  
    55.             connections[fileno].close()  
    56.             del connections[fileno]  
    57. finally:  
    58.    epoll.unregister(serversocket.fileno())  
    59.    epoll.close()  
    60.    serversocket.close()  
    level-triggered 通常用来移植以前用 selectpoll 模型的程序,而edge-triggered 模式主要用在当程序员不想让操作系统协助管理事件状态的情形。
    除了这两种操作外,socket还可以注册epoll时使用EPOLLONESHOT标记。有此标记的socket只会在第一次epoll.poll()成功有效,然后自动从监听队列中移除。

    Performance Considerations

    Listen Backlog Queue Size

    在例子 1-4中,第12行有一条listen()调用。传入的参数是监听队列的大小,它告诉操作系统允许多少没有被accept的TCP/IP连接可以放在监听队列中。每次调用accept(), 监听队列就会突出一个位置用于下个连接;如果队列已满,新的连接将直接忽略,以便不让客户端产生不必要的等待。真实场景只服务器会有成百上千的连接,所以这时传入1并不合适。例如,当使用 ab 做性能测试时,任何小于50的队列都会对性能产生影响。

    TCP Options

     TCP_CORK 选项可以等到消息装满(bottle up)后再发送。例子5的34和40行展示了其用法,可能对于使用HTTP/1.1管道的服务器有用

    Example 5
    1. import socket, select  
    2. EOL1 = b'/n/n'  
    3. EOL2 = b'/n/r/n'  
    4. response  = b'HTTP/1.0 200 OK/r/nDate: Mon, 1 Jan 1996 01:01:01 GMT/r/n'  
    5. response += b'Content-Type: text/plain/r/nContent-Length: 13/r/n/r/n'  
    6. response += b'Hello, world!'  
    7. serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
    8. serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  
    9. serversocket.bind(('0.0.0.0'8080))  
    10. serversocket.listen(1)  
    11. serversocket.setblocking(0)  
    12. epoll = select.epoll()  
    13. epoll.register(serversocket.fileno(), select.EPOLLIN)  
    14. try:  
    15.    connections = {}; requests = {}; responses = {}  
    16.    while True:  
    17.       events = epoll.poll(1)  
    18.       for fileno, event in events:  
    19.          if fileno == serversocket.fileno():  
    20.             connection, address = serversocket.accept()  
    21.             connection.setblocking(0)  
    22.             epoll.register(connection.fileno(), select.EPOLLIN)  
    23.             connections[connection.fileno()] = connection  
    24.             requests[connection.fileno()] = b''  
    25.             responses[connection.fileno()] = response  
    26.          elif event & select.EPOLLIN:  
    27.             requests[fileno] += connections[fileno].recv(1024)  
    28.             if EOL1 in requests[fileno] or EOL2 in requests[fileno]:  
    29.                epoll.modify(fileno, select.EPOLLOUT)  
    30.                connections[fileno].setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 1)  
    31.                print('-'*40 + '/n' + requests[fileno].decode()[:-2])  
    32.          elif event & select.EPOLLOUT:  
    33.             byteswritten = connections[fileno].send(responses[fileno])  
    34.             responses[fileno] = responses[fileno][byteswritten:]  
    35.             if len(responses[fileno]) == 0:  
    36.                connections[fileno].setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 0)  
    37.                epoll.modify(fileno, 0)  
    38.                connections[fileno].shutdown(socket.SHUT_RDWR)  
    39.          elif event & select.EPOLLHUP:  
    40.             epoll.unregister(fileno)  
    41.             connections[fileno].close()  
    42.             del connections[fileno]  
    43. finally:  
    44.    epoll.unregister(serversocket.fileno())  
    45.    epoll.close()  
    46.    serversocket.close()  
    相反, TCP_NODELAY 选项则通知操作系统,当调用socket.send()时数据应该立即发送。例子6中的14行展示了这种用法,像SSH或其它“实时”性程序可能会用到此选项。
    Example 6
    1. import socket, select   
    2.   
    3. EOL1 = b'/n/n'   
    4. EOL2 = b'/n/r/n'   
    5. response  = b'HTTP/1.0 200 OK/r/nDate: Mon, 1 Jan 1996 01:01:01 GMT/r/n'   
    6. response += b'Content-Type: text/plain/r/nContent-Length: 13/r/n/r/n'   
    7. response += b'Hello, world!'   
    8.   
    9. serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)   
    10. serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)   
    11. serversocket.bind(('0.0.0.0'8080))   
    12. serversocket.listen(1)   
    13. serversocket.setblocking(0)   
    14. serversocket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)   
    15.   
    16. epoll = select.epoll()   
    17. epoll.register(serversocket.fileno(), select.EPOLLIN)   
    18.   
    19. ry:   
    20.    connections = {}; requests = {}; responses = {}   
    21.    while True:   
    22.       events = epoll.poll(1)   
    23.       for fileno, event in events:   
    24.          if fileno == serversocket.fileno():   
    25.             connection, address = serversocket.accept()   
    26.             connection.setblocking(0)   
    27.             epoll.register(connection.fileno(), select.EPOLLIN)   
    28.             connections[connection.fileno()] = connection   
    29.             requests[connection.fileno()] = b''   
    30.             responses[connection.fileno()] = response   
    31.          elif event & select.EPOLLIN:   
    32.             requests[fileno] += connections[fileno].recv(1024)   
    33.             if EOL1 in requests[fileno] or EOL2 in requests[fileno]:   
    34.                epoll.modify(fileno, select.EPOLLOUT)   
    35.                print('-'*40 + '/n' + requests[fileno].decode()[:-2])   
    36.          elif event & select.EPOLLOUT:   
    37.             byteswritten = connections[fileno].send(responses[fileno])   
    38.             responses[fileno] = responses[fileno][byteswritten:]   
    39.             if len(responses[fileno]) == 0:   
    40.                epoll.modify(fileno, 0)   
    41.                connections[fileno].shutdown(socket.SHUT_RDWR)   
    42.          elif event & select.EPOLLHUP:   
    43.             epoll.unregister(fileno)   
    44.             connections[fileno].close()   
    45.             del connections[fileno]   
    46. finally:   
    47.    epoll.unregister(serversocket.fileno())   
    48.    epoll.close()   
    49.    serversocket.close()   

    Source Code

    The examples on this page are in the public domain and available for download.

  • 相关阅读:
    【译文】四十二种谬误(一)
    .NET笔记(二)
    设计模式C#实现(十六)——中介者模式
    设计模式C#实现(十五)——命令模式
    《程序员修炼之道》笔记
    《学会提问》读书笔记
    设计模式C#实现(十四)——责任链模式
    设计模式C#实现(十三)——享元模式(蝇量模式)
    学以致用——读《学会提问》
    访问苹果开发者网站太慢
  • 原文地址:https://www.cnblogs.com/lexus/p/2851543.html
Copyright © 2011-2022 走看看