本篇索引
(1)基本原理
(2)socket模块
(3)select模块
(4)asyncore模块
(5)asynchat模块
(1)基本原理
本篇指的网络编程,仅仅是指如何在两台或多台计算机之间,通过网络收发数据包;而不涉及具体的应用层功能(如Web服务器、 邮件收发、网络爬虫等等),那些属于应用编程的范畴,需要了解的可参看下一篇 Internet 应用编程。
关于使用Python进行网络通信编程,简单的例子网络上一搜一大把,但基本都是仅仅几行最简单的套接字代码, 用来做个小实验可以,但并不能实用。因为大多数Python的书和文档着重点在于讲Python语法, 并不会太细地把网络编程的底层原理给你讲清楚,比如:同步/异步的关系、线程并发监听的实现架构等等。 如果你要了解那些知识,需要去看《Unix网络编程》、《TCP/IP详解-卷1》之类的书。
本篇试图在讲Python网络编程的基础上,把涉及到的原理稍带整理一起描述一下。 一方面希望能帮到想进一步掌握Python网络编程的初学者、另一方面也方便我自己快速查阅用。
● IP地址、端口
每台电脑(服务器)都有一个固定的IP地址,而一台服务器上可能运行若干个不同的程序, 每个程序提供一种服务(比如:邮件服务程序、Web服务程序等等),每个不同的服务程序会占用一个端口号(也有占有多个端口的,比较少见), 端口(port)是一个16位数字,范围从065535。其中0~1023为保留端口,保留给特定的网络协议使用 (比如:HTTP固定使用80端口、HTTPS固定使用443端口)。一般你自己的服务程序可任意使用10000以上的端口。 它们的示意关系如下图所示:
由于要访问一个服务程序需要知道“一个IP地址和一个端口号”,因此两者加一起合称一个“地址(address)”。 在Python中,一个地址(address)一般用一个元组来表示,形如:address = (ipaddr, port)。
● 套接字
服务程序与客户端程序进行通行,需要通过一个叫做 socket(套接字)的媒介。socket 的本意是“插口”, 在网络通信中一般把它翻译成“套接字”。套接字的作用,就相当于在服务器程序和客户端程序之间建立了一根虚拟的专线, 服务器程序和客户端程序可以分别通过自己这端的套接字,向对方写入和读出数据 (在Python中,套接字一般为一个 socket 类型的实例),如此即可实现服务器和客户端的数据通信。 在服务器程序中,同一个端口可生成若干个套接字,每个套接字跟一个特定的客户端进行通信。 在客户端,如果与一个服务程序通信,一般只需生成一个套接字即可。 如下图所示:
● 编码问题
由于网络是以ascii文本格式传输数据的,而在Python3中,所有字符串都是Unicode编码的。 因此,将字符串通过网络发送时必须转码。而从网络收到数据时,也必须进行解码以转换成Python的字符串。
发送时,可使用字符串的encode()
方法进行转码,也可直接使用内置的bytes类型。 接收时,可使用字符串的decode()
方法进行解码。
# 转码示例 s.send('Hello world!'.encode('ascii')) # 方法一:使用encode()转码 s.send(b'Hello world!') # 方法二:直接发送bytes类型(字节序列) # 解码示例 recv_data = s.recv(1024) recv_str = recv_data.decode('ascii') # 使用decode()解码
(2)socket模块
socket模块提供了最原始的仿UNIX的网络编程方式,因为它非常底层,所以很适合用来说明网络编程的概念, 但在实际工作中基本上不太会直接用socket模块去编写网络程序。实际工作中, 一般都会使用Python库中提供的更加方便的模块或类(比如SocketServer等)来编写网络程序。
● 基本的UDP编程模型
UDP的编程模型比较简单,虽然服务器 socket 和客户端 socket 也是一对一通信,但是一般发完数据就放手, 服务器程序不需要花心思去管理多个客户端的连接。大体流程示意可参看下图:
在服务器程序端,先生成一个套接字,然后通过bind()
方法绑定到本地地址和特定端口,之后就可以通过recvfrom()
方法监听客户端数据了。 recvfrom()
方法为阻塞运行,即:如果客户端没有新的数据进来,服务器程序会僵在这里, 只有等到客户端有新的数据进来,这个方法才会返回,然后继续运行后面的语句。 上图是一个基本示意,各个方法的详细解释可参看后文的表格。
以下为一个UDP服务器程序的示例:
# UdpServer.py # 功能:接收客户端数据,将客户端发过来的字符串加个头“echo:”再回发过去) import socket s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.bind(("", 10000)) # 服务器程序绑定本地10000端口,空字符串表示本地IP地址 while True: data, address = s.recvfrom(256) print("Received a connection from %s" % str(address)) s.sendto(b"echo:" + data, address)
以下为UDP客户端测试程序:
# UdpClient.py import socket s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # AF_INET指IPv4,SOCK_DGRAM指UDP,后面会有详释 s.sendto(b"Hello", ("127.0.0.1", 10000)) # 服务器地址和端口(客户端一般会由操作系统随机分配发送端口) resp, addr = s.recvfrom(256) print(resp) s.sendto(b"World", ("127.0.0.1", 10000)) resp, addr = s.recvfrom(256) print(resp) s.close()
需要注意的是,在网络编程中,服务器程序和客户端程序是需要一定配合的, 需要避免进入双方都在等对方数据的卡住状态,如下图所示:
● 基本的TCP编程模型
使用UDP通信的服务器程序一般不太需要太复杂的编程技术。而如果使用TCP通信, 不使用“并发”或“异步”或“select()”编程技术基本是没法实用的。在实用中,一般只要使用这三种技术中的一种就可以了。 简单来说:“并发”是指多进程或多线程编程;“异步”是指在操作系统中先注册某种事件,当这个事件发生时, 由操作系统回调你事先注册的函数;“select()”方法后面会专门解释。
这里为说明概念,先演示最原始的单进程、单线程、什么技术都不用的原始TCP通信模型,如下图所示:
以下为一个TCP服务器程序示例:
# TcpServer.py # 功能:接收客户端的TCP连接,打印客户端发送过来的字符串,并将服务器本地时间发给客户端 from socket import * import time s = socket(AF_INET, SOCK_STREAM) # AF_INET指IPv4,SOCK_STREAM至TCP,后面会有详释 s.bind(('', 10001)) # 服务器程序绑定本地10000端口 s.listen(5) while True: s1, addr = s.accept() print("Got a connection from %s" %str(addr)) data = s1.recv(1024) print("Received: %s" %data.decode('ascii')) timestr = time.ctime(time.time()) + " " s1.send(timestr.encode('ascii')) s1.close()
以下为TCP客户端测试程序:
# TcpClient.py from socket import * s = socket(AF_INET, SOCK_STREAM) s.connect(('127.0.0.0.1', 10001)) s.send(b'Hello') tm = s.recv(1024) s.close() print("The time is %s" % tm.decode('ascii'))
TCP的编程需要服务器程序管理若干个 socket,所以编程模型与上面的UDP略有不同, 多了一个listen()
和accept()
步骤。listen()
等会儿再讲, 先讲accept()
。
在示例程序中我们可以看到s1, addr = s.accept()
的用法。其中,s 是原始的用于监听端口10001的套接字实例, accept()
方法会阻塞运行。当有客户端发起connect()
连接时,accept()
方法会接受这个连接, 并返回一个元组:分别是新套接字实例 s1 、客户端地址 addr。s1 用于与这个客户端通信,s 仍然用于监听端口10001, 看有没有新的客户端连入。
之后运行的recv()
方法,也是阻塞运行的。当这个客户端没有发送新的数据过来时, 服务器主流程就会僵在这里,无法继续往下运行。如果有新的客户端请求连接时,只能在操作系统中排队等待。 前面的listen()
方法就是用来定义操作系统中这个等待队列的长度的, 其入参即可指定操作系统中在这个监听套接字 s 上允许排队等待的最大客户端数量。 以前,在不使用前面提到的并发等3个编程技巧时,一般这个值需要为1024或者更多, 而如果使用了并发等编程技巧,一般这个值只需要5就足够了。
当 s1 与客户端通讯完毕,需要调用close()
方法关闭这个套接字。 在套接字关闭后,程序主流程再次回到上面的s1, addr = s.accept()
语句,继续监听新的连接。 若此时已经有客户端在操作系统中排队等待,则会立即从操作系统中取出一个等待的客户端,然后建立新的套接字实例。 若无等待的客户端,则本语句会阻塞,直到下一次有客户端connect()
进来时,再返回。
很显然,这种同时只能处理一个客户端连接的服务器程序是没法用的, 如果前一个客户端与服务器通信的时间比较长,那新的客户端连接请求只能在操作系统中排队等待, 而无法立即与服务器建立通信,后面我们将看到,如何用并发等编程技术解决这个问题。
以下为一个通信时间较常的TCP客户端测试程序:
# TcpClient.py from socket import * import time s = socket(AF_INET, SOCK_STREAM) s.connect(('127.0.0.0.1', 10001)) time.sleep(5) # 与服务器建立连接后,不放手,先等5秒钟再发送数据 s.send(b'Hello') tm = s.recv(1024) s.close() print("The time is %s" % tm.decode('ascii'))
你可以开2个终端运行这个通信时间较常的客户端程序,看看服务器是怎样反应的。
另外,可以比较一下以前用纯C语言写TCP服务器程序,作为参考:
// TcpServer.c #include <netinet/in.h> #include <string.h> #include <time.h> int main(int argc, char **argv) { int listenfd, connfd; char buff[4096]; time_t ticks; struct sckaddr_in servaddr; listenfd = Socket(AF_INET, SOCK_STREAM, 0); memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(13); Bind(linstenfd, (SA *)&servaddr, sizeof(servaddr)); Listen(listenfd, 1024); for(;;) { connfd = Accept(listenfd, (SA *) NULL, NULL); ticks = time(NULL); snprintf(buff, sizeof(buff), "%.24s ", ctime(&ticks)); Write(connfd, buff, strlen(buff)); Close(connfd); } }
● 采用并发技术的TCP编程模型
并发是指采用子进程或多线程方式进行编程。并发编程的核心思想是,当与客户端的连接建立后, 在主线程(或父进程)内不要有使用recv()
等可能造成阻塞的行为, 这些有可能导致阻塞的行为都通信都交给其他后台线程(或子进程)去做, 主线程(或父进程)永远只阻塞在accept()
上,负责监听新的连接并立即处理。
以下以线程并发为例,示意并发的TCP的编程模型:
以下为一个线程并发的TCP服务器程序:
# TcpServerThreading.py from socket import * import time, threading s = socket(AF_INET, SOCK_STREAM) s.bind(('', 10001)) s.listen(5) thread_list = [] def client_commu(client_socket): data = client_socket.recv(1024) print("Received: %s" %data.decode('ascii')) timestr = time.ctime(time.time()) + " " client_socket.send(timestr.encode('ascii')) client_socket.close() while True: ns, addr = s.accept() print("Got a connection from %s" %str(addr)) t = threading.Thread(target=client_commu, args=(ns,)) t.daemon = True # 将新线程处理成后台线程,主线程结束时将不等待后台线程 thread_list.append(t) t.start()
代码比较简单,很容易看懂。核心思想就如前述:每来一个新的客户端连接,就开一个新线程负责与这个客户端通信, 而主线程永远只阻塞在accept()
上监听新连接。
● socket模块的函数
以下为 socket 模块中可用的函数、方法、属性的详细解释,大部分都同 UNIX 中的同名用法。
(1)模块函数
函数或变量 | 说明 |
---|---|
模块变量 | |
has_ipv6 |
布尔值,支持IPv6时为 True。 |
连接相关 | |
socket(family, type [,proto]) | 新建套接字,返回一个 SocketType 类型的实例。family 为IP层协议,常用:AF_INET(IPv4)、AF_INET6(IPv6)。type 为套接字类型,常用:SOCK_DGRM(UDP)、SOCK_STREAM(TCP)。proto 为协议编号,通常省略(默认为0) |
socketpair([family [,type [,proto]]]) | 仅适用于创建family 为 AF_UNIX 的“UNIX域”套接字。该概述主要用于设置 os.fork()
创建的进程之间的通信。例如:父进程调用 socketpair() 创建一对套接字,然后父进程和子进程
就可以使用这些套接字相互通信了。 |
fromfd(fd, family, type [,proto]) | 通过整数文件描述符创建套接字对象,文件描述符必须引用之前创建的套接字。 该方法返回一个 SocketType 实例。 |
create_connection(address [,timeout]) | 建立与address 的TCP连接,并返回已连接的套接字对象。
address 为:(host, port) 形式的元组,timeout 指定一个可选的超时期。 |
查看主机信息 | |
gethostname() | 返回本机的主机名。 |
getfqdn([name]) | 若忽略入参name ,则返回本机主机名。其他详查文档。 |
gethostbyname(hostname) | 将主机名hostname (如:'www.python.org')转换为 IP 地址。不支持 IPV6。
这个函数会自动去查询Internet上的地址。 |
gethostbyname_ex(hostname) | 将主机名hostname (如:'www.python.org')转换为 IP 地址。
但返回元组:(hostname, aliaslist, ipaddrlist),其中hosthame是主机名,
aliaslist是同一个地址的可选主机名列表,ipaddrlist是同一个主机上同一个接口的IPv4地址列表。 |
gethostbyaddr(ip_address) | 返回信息与上面 gethostbyname_ex() 相同,但入参为IP地址。 |
getaddrinfo(host, port [,family [,socktype [,proto [,flags]]]]) | 给定关于主机的host 和port 信息,返回值为包含5个元素的元组:
(family, socktype, proto, cannonname, sockaddr),可视为 gethostbyname() 函数的增强版。
|
getnameinfo(address, flags) | 给定套接字地址address (为(ipaddr, port) 形式的元组),将其转换为
flag 指定的地址信息,主要用于获取与地址有关的详细信息。详可查看文档。 |
查询协议信息 | |
getprotobyname(protocolname) | 将协议名称(如:'icmp')转换为协议编号(如:IPROTO_ICMP的值), 以便传给 socket() 函数的第3个参数。 |
getservbyname(servicename [,protocolname]) | 将 Internet 服务名称和协议名称转换为该服务的端口号。
protocolname 可以为:'tcp'或'udp'。例如:getservbyname('ftp','tcp') |
getservbyport(port [,protocolname]) | 与上面相反,通过端口号查询服务名称。如果没有任何服务用于指定端口, 则引发 socket.error 错误。 |
超时信息 | |
getdefaulttimeout() | 返回默认的套接字超时秒数(浮点数),None表示不设置任何超时期。 |
setdefaulttimeout(timeout) | 为新建的套接字对象设置默认超时期,入参为超时秒数(浮点数),若为 None 表示没有超时(默认值) |
转码相关 | |
htonl(x) | 将主机的32位整数x 转为网络字节顺序(大尾)。 |
htons(x) | 将主机的32位整数x 转为网络字节顺序(小尾)。 |
ntohl(x) | 将来自网络的32位整数(大尾)x 转换为主机字节顺序。 |
ntohs(x) | 将来自网络的32位整数(小尾)x 转换为主机字节顺序。 |
inet_aton(ip_string) | 将字符串形式的IPv4地址(如:'127.0.0.1')转换成32位二进制分组格式,用作地址的原始编码。 返回值是由4个二进制字符组成的字符串(如:b'x7fx00x00x01')。在将地址传递给C程序时比较有用。 |
inet_ntoa(packed_ip) | 与上面 inet_aton() 功能相反。常用于从C程序传来的地址数据解包。 |
inet_pton(family, ip_string) | 功能与上面 inet_aton() 类似,但支持IPv6,family 可指定地址族。 |
inet_ntop(family, packed_ip) | 与 inet_pton() 功能相反,用于解包地址。 |
(2)套接字属性和方法
属性和方法 | 说明 |
---|---|
属性 | |
s.family |
套接字地址族(如:AF_INET)。 |
s.type |
套接字类型(如:SOCK_STREAM)。 |
s.proto |
套接字协议编号。 |
连接相关方法 | |
s.bind(address) | 通常为服务器用。将套接字绑定到特定地址和端口。address 为元组形式的:
(hostname, port),注意 hostname 必须要加引号,空字符串、'localhost'都表示本机IP地址。 |
s.listen(backlog) | 通常为服务器用。指定操作系统能在本端口上最大可以等待的还未被accept()处理的连接数量。 |
s.accept() | 通常为服务器用。接受连接并返回 (conn, address),其中conn 是新的套接字对象,
可以用这个新的套接字和某个连入的特定客户端通讯。
address 是另一端的套接字地址端口信息,为(hostname, port)元组。
|
s.connect(address) | 通常为客户端用。连接到远端address 指定的地址和端口(为 (hostname, port) 元组形式)。
如果有错误则引发 socket.error。
|
s.connect_ex() | 与上类似,但是成功时返回0,失败时返回 errno 的值。 |
s.close() | 关闭套接字。服务器客户端都可使用。 |
s.shutdown(how) | 关闭1个或2个连接。若how 为 s.SHUT_RD,则不允许接收;
若为 s.SHUT_WR,则不允许发送;若为 s.SHUT_RDWR,则接收和发送都不允许。 |
UDP 数据读写 | |
s.recvfrom(bufsize [,flags]) | UDP专用。返回 (data, address) 对,address 为 (hostname, port) 元组形式。
bufsize 指定要接收的最大字节数。flags 通常可以忽略(默认为0),
详可查看文档。 |
s.recvfrom_info(buffer [,nbytes [,flags]]) | 与 recvfrom() 类似,但接收的数据存储在入参对象buffer 中,
nbytes 指定要接收的最大字节数,如忽略则最大为buffer 大小。
flags 同上。 |
s.sendto(string [,flags] ,address) | UDP专用。将string 发送到address 指定的地址和端口
(为 (hostname, port) 元组形式)。返回发送的字节数。flags 同上。 |
TCP 数据读写 | |
s.recv(bufsize [,flags]) | 接收套接字数据,数据以字符串形式返回。bufsize 指定要接收的字节数。
flags 通常可以忽略(默认为0),详可查看文档。 |
s.recv_into(buffer [,nbytes [,flags]]) | 与 recv() 类似,但将数据写入支持缓冲区接口的对象buffer 中,
nbytes 指定要接收的最大字节数,如忽略则最大为buffer 大小。
flags 含义同上。 |
s.send(string [,flags]) | 将string 中的数据发送到套接字,flags 含义同上。
返回发送的字节数量(可能小于string 中的字节数),如有错误则抛出异常。 |
s.sendall(string [,flags]) | 将string 中的数据发送到套接字,但在返回之前会尝试发送所有数据。
成功则返回 None,失败则抛出异常。flags 含义同上。 |
套接字参数相关方法 | |
s.getsockname() | 返回套接字自己的地址端口,通常为一个元组:(ipaddr, port)。 |
s.getpeername() | 返回远端套接字的地址端口,通常为一个元组:(ipaddr, port),并非所有系统都支持该函数。 |
s.gettimeout() | 返回当前套接字的超时秒数(浮点数),如果没有设置超时期,则返回None。 |
s.getsockopt(level, optname [,buflen]) | 返回套接字选项的值。level 定义选项的级别,
optname 为特定的选项。 buflen 表示接收选项的最大长度,通常可忽略。 |
s.settimeout(timeout) | 设置套接字操作的超时秒数(浮点数),设None表示没有超时。如果发生超时, 则引发 socket.timeout 异常。 |
s.setblocking(flag) | 若flag 设为0,则套接字为非阻塞模式。在非阻塞模式下,
s.recv() 和 s.send() 调用将立即返回,若 s.recv() 没有发现任何数据、或者
s.send() 无法立即发送数据,那么将引发 socket.error 异常。
|
s.setsockopt(level, optname, value) | 设置给定套接字选项的值。参数含义同 s.getsockopt() |
文件相关 | |
s.fileno() | 返回套接字的文件描述符。 |
s.makefile([mode [,bufsize]]) | 创建与套接字关联的文件对象。mode 和bufsize 的含义与内置
open() 函数相同,文件对象使用套机子文件描述符的复制版本。 |
s.ioctl() | 受限访问 Windows 上的 WSAIoctol 接口。详可查阅文档。 |
● socket模块的异常
socket模块定义了以下异常:
异常 | 说明 |
---|---|
error | 继承自OSError,表示与套接字或地址有关的错误。它返回一个 (errno, mesg) 元组(错误编号、错误消息) 以及底层调用返回的错误。 |
herror | 继承自OSError,表示与地址有关的错误。它返回一个 (errno, mesg) 元组(错误编号、错误消息)。 |
timeout | 继承自OSError,套接字操作超时时出现的异常。异常值是字符串 'timeout'。 |
gaierror | 继承自OSError,表示 getaddrinfo()和 getnameinfo() 函数中与地址有关的错误。 它返回一个 (errno, mesg) 元组(错误编号、错误消息)。 |
errno 为socket模块中定义的以下常量之一:
常量 | 描述 | 常量 | 描述 |
---|---|---|---|
EAI_ADDRFAMILY | 不支持地址族 | EAI_NODATA | 没有与节点名称相关的地址 |
EAI_AGAIN | 名称解析暂时失效 | EAI_NONAME | 未提供节点名称或服务名称 |
EAI_BADFLAGS | 标志无效 | EAI_PROTOCOL | 不支持该协议 |
EAI_BADHINTS | 提示不当 | EAI_SERVICE | 套接字类型不支持该服务名称 |
EAI_FAIL | 不可恢复的名称解析失败 | EAI_SOCKTYPE | 不支持该套接字类型 |
EAI_FAMILY | 主机不支持的地址组 | EAI_SYSTEM | 系统错误 |
EAI_MEMORY | 内存分配失败 |
(3)select模块
select模块可使用select()
和poll()
系统调用。
select()
通常用来实现轮询,可以在不使用线程或子进程的情况下,
实现与多个客户端进行通讯。它的用法直接模仿原始UNIX中的select()
系统调用。
在 Linux 中,它可以用于文件、套接字、管道;在 Windows 中,它只能用于套接字。
poll()
函数可以直接利用Linux底层的poll()系统调用
,
Windows不支持poll()
函数。
● select()
使用select()
实现同时与多个客户端通信的核心编程思想是:select()
函数可以阻塞在多个套接字上,只要这些套接字中有一个收到数据或收到连接,
select()
就会返回,并且在返回值中包含这个收到数据的套接字。
然后用户自己的服务器程序可以根据返回值自行判断,是哪个客户端对应的套接字收到了数据,
若返回的套接字是最原始的监听套接字,则说明有新客户端的连接请求。
select()
函数的语法如下:
select(rlist, wlist, xlist [,timeout])
查询一组文件描述符的输入、输出和异常状态。前3个参数rlist
、wlist
、 xlist
都是列表,每个列表包含一系列文件描述符或类似文件描述符的对象(当某个对象具有 fileno() 方法时,它就是类似文件描述符的对象,比如:套接字)。 rlist
为输入文件描述符的列表、wlist
为输出文件描述符的列表、 xlist
为异常文件描述符的列表,这3个列表都可以是空列表。
一般情况下,本函数为阻塞运行,即当入参的上述3个列表中若没有事件发生,则本函数将阻塞挂起。 timeout
参数为指定的超时秒数(浮点数),若忽略则为阻塞运行, 若为0则函数仅将执行一次轮询并立即返回。
当有事件在入参的3个列表中发生时,本函数即返回。返回值是一个列表元组:(rs, ws, xs), rs 是入参rlist
的子集,为rlist
中发生期待事件的文件描述符列表; 比如:若入参rlist
为一系列套接字,若有一个或多个套接字收到数据, 那么select()
将返回,并且在 rs 中包含这些收到数据的套接字。
同样的:ws 是入参wlist
的子集,只要wlist
中的任何一个或多个文件描述符允许写入,那么select()
将立即在 ws 中返回这个子集。 因此,往入参wlist
中放入元素时必须十分小心。 最后,xs 是入参xlist
的子集。
如果超时时没有对象准备就绪,那么将返回3个空列表。如果发生错误,那么将触发 select.error 异常。
以下为一个使用 select() 实现的服务器例子,功能为在服务器屏幕打印从客户端收到的任何数据,直到客户端关闭连接为止:
# select_server.py import socket, select s = socket.socket() s.bind(('', 10001)) s.listen(5) inputs = [s] while True: rs, ws, es = select.select(inputs, [], []) # 阻塞运行,若无新的事件本函数会挂起 for r in rs: if r is s: c, addr = s.accept() print('Got connection from', addr) inputs.append(c) else: try: data = r.recv(1024) disconnected = not data except socket.error: disconnected = True if disconnected: print(r.getpeername(), 'disconnected') inputs.remove(r) else: print(data)
上面程序中,入参 inputs 的初始值只包含一个监听套接字s,当收到客户端的连接请求时, select()
函数会返回,并且在 rs 中包含这个套接字。 然后s.accept()
会新生成一个套接字 c,服务器程序会将其放入 inputs 列表。 以后若是收到这个客户端的数据,则select()
返回时的 rs 中会包含这个新套接字 c, 若是收到其他客户端的连接请求时,则select()
返回时的 rs 中会包含原始套接字 s。 之后的程序靠判断 rs 中究竟是哪个套接字,来决定后续的行为。
最后,若客户端调用close()
关闭连接(本质上是发送一个长度为0的数据:b''), 则服务器收到这个0长度数据后,在屏幕打印关闭连接的客户端地址,并将这个与之对应的套接字移出 input 队列。
● poll()
poll()函数可创建利用poll()系统调用
的“轮询对象”,Windows不支持 poll() 函数。
poll()返回的轮询对象支持以下方法:
方法 | 说明 |
---|---|
p.register(fd [,eventmask]) | 注册新的文件描述符fd ,fd 为一个文件描述符、 或一个类似文件描述符的对象(当某个对象具有fileno() 方法时,它就是类似文件描述符的对象, 比如套接字)。eventmask 可取值见下表,可以“按位或”。 如果忽略eventmask ,则仅检查 POLLIN, POLLPRI, POLLOUT 事件。 |
p.unregister(fd) | 从轮询对象中删除文件描述符fd ,如果没有注册,则引发 KeyError 异常。 |
p.poll([timeout]) | 对所有已注册的文件描述符进行轮询。timeout 位可选的超时毫秒数(浮点数)。 返回一个元组列表,列表中每个元组的形式为:(fd, event),其中 fd 是文件描述符列表、 event 是指示事件的位掩码(含义见下表)。 例如,要检查 POLLIN 事件,只需使用event & POLLIN 测试值即可。 如果返回空列表,则表示到达超时值且没有发生任何事件。 |
eventmask
和event
支持的事件:
常量 | 描述 | 常量 | 描述 |
---|---|---|---|
POLLIN | 可用于读取的数据 | POLLERR | 错误情况 |
POLLPRI | 可用于读取的紧急数据 | POLLHUP | 保持状态 |
POLLOUT | 准备写入 | POLLNVAL | 无效请求 |
以下为一个使用 poll() 实现的服务器例子:
import socket, select s = socket.socket() s.bind(('', 8009)) s.listen(5) fdmap = {} p = select.poll() p.register(s) while True: events = p.poll() # 阻塞运行,若无新的事件本函数会挂起 for fd, event in events: if fd == s.fileno(): c, addr = s.accept() print('Got connection from', addr) p.register(c) fdmap[c.fileno()] = c elif event & select.POLLIN: data = fdmap[fd].recv(1024) if not data: print(fdmap[fd].getpeername(), 'disconnected') p.unregister(fd) del fdmap[fd] else: print(data)
总体来说,poll()
的使用比select()
略为简单。上面程序中,首先通过 p.register(s)
注册要监听的套接字,然后调用events = p.poll()
等待连接或数据,当p.poll()
返回时,即遍历其返回值。若fd为监听套接字 s 的文件描述符,则通过调用s.accept()
新建一个与此客户端通信的套接字, 然后其通过p.register(c)
注册进监听事件,再将这个套接字放入字典 fdmap 以备以后可直接通过 fd 拿出套接字。
之后,每当收到新的数据,若非监听套接字 s 收到数据,则说明是与客户端通信的某个套接字 c 收到了数据,则通过data = fdmap[fd].recv(1024)
把数据收进来。若收到数据长度为0, 说明是用户端关闭套接字,则在本处理程序中,使用p.unregister(fd)
解除对这个套接字的监听。最后在 fdmap 字典中删除这个套接字的索引。
(4)asyncore模块
asyncore模块用来编写“异步”网络程序(内部核心原理是使用select()
系统调用), 它可以用于希望提供并发性但又无法使用多线程(或子进程)的环境。
回忆一下异步的核心思想:当发生某事件时(比如收到客户端数据、或收到新的客户端连接请求等等), 由操作系统来回调运行你先前为这个事件定义好的函数或方法。这些事先定义好的函数或方法只会由操作系统来调用, 而不会影响你自己程序的主流程。
不过由于asyncore模块过于底层,一般工作中不太会直接使用asyncore模块编写网络程序, 而会用其他更高级的模块(如:asynchat等),这里仅仅用asyncore模块来说明异步网络编程的基本方法。
asyncore模块主要提供了一个 dispatcher 类,其所有功能都几乎都由 dispatcher 类提供, dispatcher 类内部封装了一个普通套接字对象,其初始化语法如下:
dispatcher([sock])
上面的 dispatch() 函数定义事件驱动型非阻塞套接字对象(比较拗口哈)。sock
是现有的套接字对象。 如果忽略该参数,则后面需使用 create_socket() 方法创建套接字。一般我们在编程中通过继承 dispatcher 类并重定义它的一些方法,来实现自己需要的功能。
● dispatcher 对象支持以下方法
方法或函数 | 说明 |
---|---|
可重定义的基类方法 | |
d.handle_accept() | 收到新连接时系统会自动调用该方法。 |
d.handle_connect() | 作为客户端进行连接。 |
d.handle_close() | 套接字关闭时系统会自动调用该方法。 |
d.handle_error() | 发生未捕获的异常时系统会自动调用该方法。 |
d.handle_expt() | 收到套接字外带数据时系统会自动调用该方法。 |
d.handle_read() | 从套接字收到新数据时,系统会自动调用该方法。 |
d.handle_write() | 当 d.writable() 方法返回True时,系统会自动调用该方法。 |
d.readable() | 内部的select()方法使用该函数查看对象是否准备读取数据,如果是则返回 True。 接下来系统会自动调用 d.handle_read() 来读取数据。 |
d.writable() | select()方法使用该函数查看对象是否想写入数据,如果是则返回 True。 |
底层方法(直接操作其内部的套接字) | |
d.create_socket(family, type) | 新建套接字,参数含义与底层 socket() 相同。 |
d.bind(address) | 将套接字绑定到address ,address 是一个 (host, port) 元组。 |
d.listen([backlog]) | 监听传入连接,参数含义与底层 listen() 相同。 |
d.accept() | 接受连接,返回元组 (client, addr),其中client 是新建的套接字对象, addr 是客户端的地址/端口元组。 |
d.close() | 关闭套接字 |
d.connect(address) | 建立连接,address 是一个 (host, port) 元组。 |
d.recv(size) | 最大接收size 个字节,返回空字符串表示客户端已关闭了通道。 |
d.send(data) | 发送数据data (字符串) |
asyncore 模块的函数 | |
loop([timeout [,use_poll [,map[,count]]]]) | 无限轮询事件。使用 select() 函数进行轮询,如果use_poll 参数为True, 则使用 poll() 进行轮询。timeout 表示超时秒数,默认为30秒。 map 是一个字典,包含所有要监视的通道。count 指定返回之前要执行的轮询操作次数(默认为None,即一直轮询,直到所有通道关闭) |
● asyncore的使用示例
下例展示了一个asyncore的服务器程序,它的功能是:当收到客户端发送过来的任何数据时, 在服务器屏幕上显示这个收到的数据,并将服务器本地时间发送给客户端。 由客户端决定何时关闭连接。
# asyncore_server.py import asyncore import socket import time # 该类仅处理“接受连接”事件 class asyncore_server(asyncore.dispatcher): def __init__(self, port): asyncore.dispatcher.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.bind(('', port)) self.listen(5) def handle_accept(self): client, addr = self.accept() return asyncore_tcp_handler(client) # 该类为每个具体客户端生成一个实例,并处理服务器和这个客户端的通讯 class asyncore_tcp_handler(asyncore.dispatcher): def __init__(self, sock = None): asyncore.dispatcher.__init__(self, sock) self.writable_flag = False def handle_read(self): recv_data = self.recv(4096) if len(recv_data) > 0: print(recv_data) self.writable_flag = True def writable(self): return self.writable_flag def handle_write(self): timestr = time.ctime(time.time()) + " " bytes_sent = self.send(timestr.encode('ascii')) self.writable_flag = False def handle_close(self): print('The client is closed.') self.close() a = asyncore_server(10001) # 创建监听服务器 asyncore.loop() # 无限轮询
程序要点分析如下:
(1)本程序定义了一个 asycore_server 类和一个 asyncore_tcp_handler 类, 都继承自asyncore.dispatcher
。前者(asycore_server类)用于监听所有的新连接事件, 后者(asyncore_tcp_handler类)用于处理与某个已建立连接的具体服务端通信。
(2)程序的最下面两行:先建立一个 asycore_server 的实例,然后进入无限循环, 监听 10001 端口的所有新连接事件。
(3)当有新的客户端连入时,系统会自动回调此监听实例的 handle_accept()
方法, 在这个方法中,我们通过调用底层的accept()
方法,得到一个新的套接字 client, 并用这个新套接字生成一个 ascycore_tcp_handler 实例,负责与这个客户端一对一通信。
(4)当已建立连接的客户端向服务器发送数据时,系统会自动调用 asyncore_tcp_handler 实例的 handle_read()
方法。在这个方法中,我们通过调用底层的recv()
方法, 得到客户端法来的数据,并将其 print 到服务器屏幕上,然后将我们自定义的实例属性 writable_flag
设为 True。
(5)由于我们已经重写了实例的writable()
方法,当我们在上面将实例属性 writable_flag
设为 True时,这个writable()
方法也会返回 True。 由于系统在后台不停地在监视writable()
方法的返回值,当发现这个方法返回值为 True时,系统即自动调用本实例的 handle_write()
方法。
(6)在handle_write()
方法中,我们通过调用底层方法send()
, 将本地时间发送给客户端。发送完后别忘了将writable_flag
属性设回 False, 否则系统会不停地调用handle_write()
方法。
(7)当客户端提出关闭连接时(即客户端调用:close()
方法), 系统回会自动调用本实例的handle_close()
方法,我们可以在此方法中调用底层的 close()
方法,关闭服务端与此客户端的连接的连接,然后本实例就会自动销毁。
以下是一个客户端的例子,用来测试这个服务器:
from socket import * import time s = socket(AF_INET, SOCK_STREAM) s.connect(('127.0.0.1', 10001)) # 第一次发送数据并接收 s.send('Hello'.encode('ascii')) tm = s.recv(1024) print("The time is %s" % tm.decode('ascii')) # 等待1秒钟 time.sleep(1) # 第二次发送数据并接收 s.send('World'.encode('ascii')) tm = s.recv(1024) print("The time is %s" % tm.decode('ascii')) # 关闭连接 s.close()
(5)asynchat模块
asynchat模块将asyncore的底层I/O功能进行了封装,提供了更高级的编程接口, 非常适用于基于简单请求/响应机制的网络协议(如 HTTP)。
asynchat模块提供了一个名为async_chat
的基类,用户需要继承这个基类, 并自定义两个必要的方法:incoming_data()
和found_terminator()
。 当网络收到数据时,系统会自动调用incoming_data()
方法。
对于发送数据,async_chat
在内部实现了一个 FIFO 队列, 用户可以通过调用push()
方法将要发送的数据压入队列,然后就不用管了, 系统会自动在网络可发送时,将 FIFO 队列中的数据发送出去。
可使用以下函数,定义async_chat
的实例,sock
是与客户端一对一通信的套接字对象。
async_chat([sock])
async_chat
的实例除了继承了asyncore.dispatcher
基类提供的方法之外, 还具有以下自己的方法:
方法 | 说明 |
---|---|
a.collect_incoming_data(data) | 通道收到数据时系统会自动调用该方法。data 是本实例套接字通道收到的数据, 用户必须自己实现该方法,在该方法中用户通常需要将收到的数据保存起来已供后续处理。 |
a.set_terminator(term) | 设置本实例套接字通道的终止符,term 可以是字符串、整数或者 None。 如果term 是字符串,则在输入流出现该字符串时,系统会自动调用 a.found_terminator() 方法。如果term 是整数,则它指定一次收的字节数, 当通道收到指定的字节数后,系统自动调用方法。 如果term 是 None,则持续收集数据。 |
a.get_terminator() | 返回本实例套接字通道的终止符。 |
a.found_terminator() | 当本实例的套接字通道收到由本实例的set_terminator() 方法设置终止符时, 系统会自动调用该方法。该方法必须由用户实现。 通常,它会处理此前由collect_incoming_data() 方法保存的数据。 |
a.push(data) | 将数据压入 FIFO 队列,data 是要发送的字节序列。 |
a.discard_buffers() | 丢弃 FIFO 队列中保存的所有数据。 |
a.close_when_done() | 将 None 压入 FIFO 队列,表示传出数据流已到达文件尾。 当系统从 FIFO 中读到 None 时将关闭本套接字通道。 |
a.push_with_producer(producer) | 将一生产者对象producer 加入到生产者 FIFO 队列。 producer 可以是任何具有方法more() 的对象。 重复调用本方法可以将多个生产者对象推入生产者 FIFO 队列。 |
simple_producer(data [,buffer_size]) | 这是 asynchat 模块为a.push_with_producer() 单独定义的类, 可以用来创建简单的生产者对象,从字节序列data 生成数据块, buffer_size 指定数据块大小(默认512)。 |
asynchat 模块总是和 asyncore 模块一起使用。一般使用asyncore.dispatch
实例来监听端口, 然后由 asynchat 模块的async_chat
的子类实例来处理与每个客户端的连接。下面是一个简单的实例, 服务器在屏幕打印任何从客户端收到的数据,当发现终止符b'
'
时, 向客户端发送服务器本地时间,并关闭这个套接字。
# asynchat_server.py import asynchat, asyncore, socket import time class asyncore_http(asyncore.dispatcher): def __init__(self, port): asyncore.dispatcher.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.bind(('', port)) self.listen(5) def handle_accept(self): client, addr = self.accept() return asynchat_tcp_handler(client) class asynchat_tcp_handler(asynchat.async_chat): def __init__(self, conn=None): asynchat.async_chat.__init__(self, conn) self.data = [] self.got_terminator = False self.set_terminator(b' ') def collect_incoming_data(self, data): if not self.got_terminator: self.data.append(data) print(data) def found_terminator(self): self.got_terminator = True timestr = time.ctime(time.time()) + " " self.push(timestr.encode('ascii')) self.close_when_done() a = asyncore_http(10001) asyncore.loop()
以上例子对比前面的纯使用 asyncore 模块的例子,在写与客户端通信的程序时,要简洁很多。
(6)socketserver模块
socketserver模块包括很多TCP、UDP、UNIX域 套接字服务器实现的类,用它们来编写服务器程序非常方便。 要使用该模块,用户必须继承并实现2个类:一个是 Handler 类(事件处理程序)、一个是 Server 类(服务器程序)。 这两个类需要配合使用。
● Handler 类(事件处理程序)
用户需要自定义一个 Handler 类(继承自基类BaseRequestHandler
), 其中需自定义实现以下方法:
方法 | 说明 |
---|---|
h.setup() | 对本实例进行一些初始化工作,默认情况下,它不执行任何操作。 如果用户希望在处理网络连接前,先作一些配置工作(如建立 SSL 连接), 那么可以改写该方法。 |
h.handle() | 当 Server 类监听到新的客户端连接请求或收到来自已连接的客户端的数据, 系统将自动回调这个函数。在这个函数中,用户可以自定处理客户端连接或数据。 |
h.finish() | 完成h.handle() 方法后,系统会自动回调此本方法作一些清理工作。 默认情况下,它不执行任何操作。如果执行h.setup() 或h.handle() 时发生异常, 则不会调用本方法。 |
BaseRequestHandler 实例的一些可用属性:
属性 | 说明 |
---|---|
h.request |
对于 TCP 连接,是本实例内置的套接字对象。 对于 UDP 连接,是包好收到数据的字节字符串。 |
h.client_address |
为客户端的(地址, 端口)元组。 |
h.server |
本实例对应的 Server 实例。 |
h.rfile |
可以像操作文件对象一样,从h.rfile 读取客户端数据 (用例如:data = h.rfile.readline())。 |
h.wfile |
可以像操作文件对象一样,向h.wfile 写入数据, 这些数据会被传送到已建立连接的客户端 (用例如:h.wfile.write('Hello'.encode('ascii')) )。 |
BaseRequestHandler
还有两个派生类,用于简化操作。 如果用户仅使用 TCP 进行通信,那么自定义的 Handler 类可继承自StreamRequestHandler
类。 如果用户仅使用 UDP 进行通信,那么自定义的 Handler 类可继承自DatagramRequestHandler
类。 在这两种情况下,用户仅需实现h.handle()
方法就可以了。
● Server 类(服务器程序)
定义完上面的 Handler 类后,用户还需要定义一个 Server 类。 socketserver 模块提供了5个可供用户继承的类,分别是:
● BaseServer(address, handler)
;
● UDPServer(address, handler)
:继承自 BaseServer;
● TCPServer(address, handler)
:继承自 BaseServer;
● UnixDatagramServer(address, handler)
:继承自 UDPServer,UNIX域专用;
● UnixStreamServer(address, handler)
:继承自 TCPServer,UNIX域专用;
其中入参address
为 (ipaddr, port) 元组,
handler
为用户为此 Server 实例配对的自定义 Handler 类(注意是“类”,不是实例)。
用户可根据自己的连接类型,自行选择继承相应的 Server 类实现服务程序。
Server 实例具有以下共有方法和属性:
方法或属性 | 说明 |
---|---|
s.fileno() | 返回本实例对应的套接字的文件描述符,使得本实例可供select() 直接使用。 |
s.serve_forever() | 进入无限循环,处理本实例对应端口的所有请求。 |
s.shutdown() | 停止s.serve_forever() 无限循环。 |
s.server_address |
本实例监听的(地址, 端口)元组。 |
s.socket |
本实例对应的套接字对象。 |
s.RequestHandlerClass |
本实例对应的 Handler 类(事件处理)。 |
Server 还可以定义以下“类变量”来配置一些基本参数;以下的“类方法”一般不必动,但也可以改写:
类变量或类方法 | 说明 |
---|---|
Server.socket_type |
服务器使用的套接字类型,如socket.SOCK_STREAM 或
socket.SOCK_DGRAM 等。 |
Server.address_family |
服务器套接字使用的地址族,如:socket.AF_INET 等。 |
Server.request_queue_size |
传递给套接字的listen() 方法的队列值大小,默认值为 5。 |
Server.timeout |
服务器等待新请求的超时秒数,超时期结束后,服务器会自动回调本类的
Server.handle_timeout() 类方法。 |
Server.allow_reuse_address |
布尔标志,指示套接字是否允许重用地址。在程序终止后,一般其他程序若要使用本端口, 需要等几分钟时间。但若此标志为 True,则其他程序可在本程序结束后立即使用本端口。 默认为 False。 |
Server.bind() | 对服务器执行bind() 操作。 |
Server.activate() | 对服务器执行listen() 操作。 |
Server.handle_timeout() | 服务器发生超时时会自动回调本方法。 |
Server.handle_error(request, client_address) | 此方法处理操作过程中发生的未处理异常,若要获取关于上一个异常的信息,
可使用 traceback 模块的sys.exc_info() 或其他函数。 |
Server.verify_request(request, client_address) | 在进一步处理之前,如果需要验证连接,则可以重新定义本方法。 本方法可以实现防火墙功能或执行某写验证。 |
● socketserver 使用示例
以下为一个单进程、单线程的 socketserver 服务器程序示例:
# my_socketserver.py from socketserver import TCPServer, StreamRequestHandler import time class MyTCPHandler(StreamRequestHandler): def handle(self): print('Got connection from: ', self.client_address) while True: recv_data = self.request.recv(1024) if len(recv_data): print(recv_data) if b' ' in recv_data: resp = time.ctime() + " " self.request.send(resp.encode('ascii')) else: print(self.client_address, ' Disconnected') break; class MyTCPServer(TCPServer): allow_reuse_address = True serv = MyTCPServer(('', 10001), MyTCPHandler) serv.serve_forever()
在上面的示例程序中,用户定义了两个继承类:MyTcpHandler 用于处理客户端连接和客户端数据, MyTcpServer 用于定义服务器类。
(1)在主程序中,先初始化一个 serv 实例,并为其绑定服务器地址/端口和 Handler 类。之后, 即调用 serv 实例的 serve_forever()
方法,进入无限循环监听端口。 此时会在 serv 实例内部自动生成一个 MyTCPHandler 的实例,用以监听服务器端口并处理数据。
(2)当客户端发起连接时,系统会自动回调内部 MyTCPHandler 的实例的handle()
方法。 在此方法中,示例程序使用while True:
和self.request.recv()
结构, 接收从客户端发来的数据。
(3)若客户端发来普通数据,则在服务器在屏幕上打印这个发来的数据。 若客户端发来的数据中含有换行符 b' ',则处理程序将本地时间发送给客户端。
(4)若客户端关闭连接(即发送长度为0的数据),则处理程序通过break
语句退出 while True:
循环,并结束handle()
方法,此时服务端也会在内部关闭连接, 并销毁这个内部的 MyTCPHandler 实例。再生成一个新的 MyTCPHandler 实例来监听和处理下一次客户端的连接。
(5)需要理解的是:对于这种单进程单线程的服务器程序,当前一个客户端与服务器程序还处于连接状态时, 下一个客户端是无法连入这个服务器程序的,只能在操作系统层面等待(listen()
函数的入参 即是用来指示:这个端口在操作系统层面可以等待的客户端的队列的长度)。 只有当前一个客户端关闭连接后,服务器程序才能从操作系统的等待队列中,取出下一个客户端进行处理。
以下是客户端测试程序的例子:
# client.py from socket import * import time s = socket(AF_INET, SOCK_STREAM) s.connect(('127.0.0.1', 10001)) s.send('Hello'.encode('ascii')) time.sleep(1) s.send('World '.encode('ascii')) tm = s.recv(1024) print("The time is %s" % tm.decode('ascii')) s.close()
● socketserver的并发处理
在前面的例子中,服务器程序不能同时处理多个客户端的连接,只能等一个客户端关闭连接后, 再处理下一个客户端的数据。socketserver 模块提供了非常方便的并发扩展功能, 只要将上面的程序稍作修改,就能变成“子进程”或“多线程”并发模式,同时处理若干个客户端的连接。
简单来讲,socketserver 模块提供了几个UDPServer
和TCPServer
的派生类, 用以实现并发功能,这些派生类分别是:
● ForkingUDPServer(address, handler)
:UDPServer 的子进程并发版(Windows不支持);
● ForkingTCPServer(address, handler)
:TCPServer 的子进程并发版(Windows不支持);
● TheadingUDPServer(address, handler)
:UDPServer 的多线程并发版;
● TheadingTCPServer(address, handler)
:TCPServer 的多线程并发版;;
在实际使用中,只要从以上几个类继承实现自己的 Server 类就可以了。对,就是这么简单!
比如,对于上面的服务器示例程序,只要将程序中的TCPServer
改成TheadingTCPServer
,
就变成了多进程并发服务器程序,程序会为每个客户端连接创建一个独立的线程,可同时与多个客户端进行通信。
修改后的多线程版服务器程序如下:
# my_socketserver.py from socketserver import ThreadingTCPServer, StreamRequestHandler import time class MyTCPHandler(StreamRequestHandler): def handle(self): print('Got connection from: ', self.client_address) while True: recv_data = self.request.recv(1024) if len(recv_data): print(recv_data) if b' ' in recv_data: resp = time.ctime() + " " self.request.send(resp.encode('ascii')) else: print(self.client_address, ' Disconnected') break; class MyTCPServer(ThreadingTCPServer): allow_reuse_address = True serv = MyTCPServer(('', 10001), MyTCPHandler) serv.serve_forever()
对于ForkingUDPServer
和ForkingTCPServer
,额外有以下控制属性:
属性 | 说明 |
---|---|
max_children |
子进程的最大数量 |
timeout |
收集僵尸进程的操作时间间隔 |
active_children |
跟踪正在运行多少个活动进程 |
对于TheadingUDPServer
和TheadingTCPServer
,额外有以下控制属性:
属性 | 说明 |
---|---|
daemon_threads |
若设为True,则这些线程都变成后台线程,会随主线程退出而退出。 默认为 False。 |