0. 前言
没有前言。
1. 一个简单的开始
import socket
response = 'HTTP/1.1 200 OK
Connection: Close
Content-Length: 1
A'
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8080))
server.listen(32)
while True:
client, clientaddr = server.accept() # blocking
request = client.recv(4096) # blocking
client.send(response) # maybe blocking
client.close()
这可能是我写过最简单的服务器了。
解释一下程序中的变量。
response
服务器的返回。
作为示例,我使用了一个简单的 HTTP
响应作为服务器的回复,返回一个字母 'A' ,这样我就可以使用各种 HTTP
的工具来检测服务器的性能了(比如 ab
和 wrk
,后面的例子我会用 wrk
)。
server
服务器的 socket 对象。
client
客户端的 socket 对象。
clientaddr
客户端的地址,以 (ip,port)
形式由 accept
创建。
request
客户端发来的请求数据。
每章开始的完整示例代码都可以直接运行,没有依赖,但是否能运行在 Python 3 上我还没有测试过。欢迎测试过的朋友告诉我。
1.1 Socket 插口
server = socket.socket()
创建 server
的 socket
,如果在 C
语言中,socket()
函数应该是返回一个 int
类型的文件描述符(File Descriptor
简称 fd
),在 Windows
下通常也叫做名柄。
在 Python
中,是由 socket
库封装的一个 socket
对象,如果想获取原始的文件描述符,可以调用函数 server.fileno()
,它会返回 int
类型的 fd
。
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
这行代码的作用是保证在 TIME_WAIT
状态下 bind() 调用可以成功,能够使服务器退出之后,可以被立即重启。你可以自己实验一下去掉这行代码之后,服务器重启的表现。更多关于 TIME_WAIT
的内容可以自己 Google 一下。如果比较懒那只要记得在每次创建服务器 socket 的时候都要把 SO_REUSEADDR 参数设成 1 就行了。
server.bind(('0.0.0.0', 8080))
绑定到本机的IP和端口,0.0.0.0 代表本机所有的 IPv4 的 IP 。
server.listen(32)
listen 创建一个接收连接的队列,参数 32 是允许等待的连接的数量。
while True
的循环体是服务器的处理逻辑,也是这篇文章的核心内容。这里只是简单的接收客户端连接( accept
调用) 和客户端数据( recv
调用)并发送回复 ( send
调用)。最后关闭连接 (close
调用)。出于易于理解的目的,我没有写任何错误处理。
client
变量和 server
变量一样,在 C
中是一个 fd
,在 Python
中是一个 socket
的对象。
1.2 Blocking 阻塞
我在三处会发生阻塞的地方加了注释。
第一处阻塞
client, clientaddr = server.accept() # blocking
当调用的函数是一个阻塞的系统调用时,如果没有满足的数据,这个系统调用不会返回,进程会被操作系统挂起,直到有满足的数据才会将进程唤醒。比如 accept
在此处就会发生阻塞。
第二处阻塞
request = client.recv(4096) # blocking
recv
和 accept
一样。
第三处可能的阻塞
client.send(response) # maybe blocking
send
调用是 可能阻塞 。
在内核的 socket
实现中,会有两个缓存 (buffer)。read buffer 和 write buffer 。当内核接收到网卡传来的客户端从数据后,把数据复制到 read buffer ,这个时候 recv
阻塞的进程就可以被唤醒。
当调用 send
的时候,内核只是把 send
的数据复制到 write buffer 里,然后立即返回。只有 write buffer 的空间不够时 send
才会被阻塞,需要等待网卡发送数据腾空 write buffer 。在 write buffer 的空间足够放下 send
的数据时进程才可以被唤醒。
这个例子是阻塞的,在 accept
阻塞的时候,无法调用 recv
接收客户端数据。在 recv
阻塞的时候也无法调用 accept
接受新的请求。所以它同时只能接受和处理一个请求,如果一个客户端发送数据的速度比较慢,会拖慢整个服务器响应的速度。
但在局域网的情况下却很快,我用 wrk 测试一下这个简单 server 的性能。
$ wrk http://127.0.0.1:8080
Running 10s test @ http://127.0.0.1:8080
2 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 7.36ms 37.23ms 359.84ms 96.43%
Req/Sec 2.98k 1.03k 3.69k 84.91%
16293 requests in 10.08s, 0.90MB read
Requests/sec: 1616.60
Transfer/sec: 91.57KB
还不错。
2. 用进程或线程处理多个客户端
既然在 第1章 中因为 accept
和 recv
互相阻塞,服务器才不能同时处理多个请求,那么把它们拆出来不就好了?
这是早期大多数服务器使用的办法,具体实现有下面几种。
2.1 CGI 的方式
代码.
import os
import sys
import socket
import multiprocessing
response = 'HTTP/1.1 200 OK
Connection: Close
Content-Length: 1
A'
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8080))
server.listen(32)
def handler(client):
request = client.recv(4096)
client.send(response)
client.close()
sys.exit()
while True:
client, addr = server.accept()
process = multiprocessing.Process(target=handler, args=(client,))
process.daemon = True
process.start()
这个方法利用了 UNIX
中子进程会继承父进程所有 fd
的特性。
代码开始的部分和 第1章 是一样的,这里的 accept
依然会阻塞,但一旦新的连接建立,我会创建一个新的进程去调用 recv
和 send
(handler
函数在新的进程中执行)。在主进程中立即再次调用 accept
准备接收下一个客户端连接。
CGI 虽然 能处理多个客户端的请求,可是它实在太慢了。打个比方,第1章 的代码相当于一辆汽车在一条公路上行驶,虽然每次只能送一个人,但它可以开得飞快。在 CGI 中变成了无数辆汽车,一旦汽车的数量超过道路的负载就堵车了。
用 wrk
测试一下性能
$ wrk http://127.0.0.1:8080
Running 10s test @ http://127.0.0.1:8080
2 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 23.01ms 5.14ms 52.45ms 78.82%
Req/Sec 214.53 35.85 303.00 76.00%
4296 requests in 10.06s, 243.33KB read
Requests/sec: 427.08
Transfer/sec: 24.19KB
实在太慢了。
为了解决这个问题,我有个新的办法。
2.2 Thread 线程
先看代码
import os
import sys
import socket
import threading
response = 'HTTP/1.1 200 OK
Connection: Close
Content-Length: 1
A'
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8080))
server.listen(32)
def handler(client):
request = client.recv(4096)
client.send(response)
client.close()
while True:
client, clientaddr = server.accept()
thread = threading.Thread(target=handler, args=(client,))
thread.daemon = True
thread.start()
代码和 CGI 几乎是一样的,只是把进程换成了线程。既然给汽车太多会堵车,那么给每个人发一辆自行车好了,这样一条路就可以让更多的人通过了。
显然这样做是治标不治本啊,自行车多了,不还是会堵车吗。我又想到了新的办法。
2.3 Prefork
代码
import os
import sys
import socket
import multiprocessing
response = 'HTTP/1.1 200 OK
Connection: Close
Content-Length: 1
A'
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8080))
server.listen(32)
def handler():
while True:
client, addr = server.accept()
request = client.recv(4096)
client.send(response)
client.close()
sys.exit()
processes = []
for i in range(0, 4):
process = multiprocessing.Process(target=handler, args=())
process.start()
processes.append(process)
for process in processes:
process.join()
聪明的我发现,如果道路只允许固定的汽车同时通过能够保证道路的畅通,而且其它汽车等待的时间比堵车的时间还要短。那我干脆只允许每次只有固定数量的汽车通过好了,如果路上没有位置,就让他等待别的车行驶到终点。如果路上还有空余的位置,就立即通行。这样的设计即能同时处理多个请求,也能保证不会让系统变得很慢。
这个设计简直太聪明了。现在有相当多的服务器是这样实现的。
因为 UNIX
子进程会继承父进程的所有 fd
。显然 server
变量也不例外,这样只要在子进程中调用 accept
和 recv
就好了(相当于多个 第1章 中的实例),即使阻塞也只会阻塞一个进程。这个方式能够同时处理请求的数量等于子进程的数量。
你可能会有一个问题,“如果多个子进程同时阻塞在 accept
上,那么新的连接来了应该唤醒谁?”
这个现象叫“惊群效应”,在早期 Linux
内核中,每个阻塞在 accept
的子进程都会被唤醒,但只有一个进程能成功建立连接,其它的进程会返回一个错误。新的 Linux
内核已经不存在这个问题,每次只会唤醒一个进程,其它进程继续等待新的连接。
这个时候想,如果把 Prefork
中的 “汽车” 换成 “自行车” 会怎样?
2.4 ThreadPool 线程池
ThreadPool 有两个版本,第一个版本在主线程中 accept
子线程中 recv
和 send
import os
import sys
import time
import Queue
import socket
import threading
response = 'HTTP/1.1 200 OK
Connection: Close
Content-Length: 1
A'
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8080))
server.listen(32)
def handler(queue):
while True:
client = queue.get()
request = client.recv(4096)
client.send(response)
client.close()
thread.exit()
queue = Queue.Queue()
threads = []
for i in range(0, 4):
thread = threading.Thread(target=handler, args=(queue,))
thread.daemon = True
thread.start()
threads.append(thread)
while True:
client, clientaddr = server.accept()
queue.put(client)
第二个版本,子线程同时 accept
并 recv
和 send
import os
import sys
import time
import socket
import threading
response = 'HTTP/1.1 200 OK
Connection: Close
Content-Length: 1
A'
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
server.bind(('0.0.0.0', 8080))
server.listen(32)
def handler():
while True:
client, clientaddr = server.accept()
request = client.recv(4096)
client.send(response)
client.close()
thread.exit()
threads = []
for i in range(0, 4):
thread = threading.Thread(target=handler, args=())
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
这个就是 “Prefork
” 的线程池版。现在也有相当多的服务器是用这个方法实现的。
很难说 Prefork
和 ThreadPool
哪个更好,如果业务相对独立 Prefork
提供了更好的隔离性,进程之间不能访问彼此的内存空间,而且即使有一个子进程甚至主进程崩溃都不会影响正常的子进程。但如果需要共享数据的话,Prefork
是相当的麻烦。
当需要共享大量数据的时候 ThreadPool
用起来会更顺手。但也同时需要注意多线程临界区的加锁等。实现不好可能会变成死锁或者效率太低。
在线程池的版本中,第一个 ThreadPool
中的例子在 queue
上会有读写锁(主线程写,子线程读)。不过可以用 Lock Free 算法实现一个精巧的 Ring Buffer 避免这个问题,后边服务器优化的文章再介绍。
第二个 ThreadPool
的例子和 prefork
版本几乎是一模一样的。依赖内核的 accept
的实现。
2.5 SO_REUSEPORT Linux 独有的方式
在 Linux 3.9
版的内核中增加了 SO_REUSEPORT
支持。允许多个 socket 绑定同到一个 (ip, port)
,由内核负责向不同的 socket
分发连接。
server.setsockopt(socket.SOL_SOCKET, socket. SO_REUSEPORT, 1)
实现相当于把第一章的代码加上上面一行,并且多开几个实例就可以了。这个功能非常简单好用。但注意只在 Linux
下能用,BSD
上虽然也有 SO_REUSEPORT
但并不能实现内核分发的功能。
3. Nonblocking 非阻塞
不好意思,前面的两章其实只是铺垫。Nonblocking 的服务器,才是现在主流技术。
第二章中的 CGI, Thread, Prefork, ThreadPool 的目标只有一个,解决阻塞调用拖慢服务器的问题。费了这么大的功夫,为什么不直接把“阻塞”的函数变成“不阻塞”的呢?
上代码
import os
import fcntl
import socket
import select
response = 'HTTP/1.1 200 OK
Connection: Close
Content-Length: 1
A'
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8080))
server.listen(32)
# set nonblocking
flags = fcntl.fcntl(server.fileno(), fcntl.F_GETFL)
fcntl.fcntl(server.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)
clients = set([])
while True:
try:
client, clientaddr = server.accept()
clients.add(client)
except Exception as e:
pass
for client in clients.copy():
try:
request = client.recv(4096)
client.send(response)
clients.remove(client)
client.close()
except Exception as e:
pass
这次的代码有点儿复杂,我慢慢讲。
flags = fcntl.fcntl(server.fileno(), fcntl.F_GETFL)
fcntl.fcntl(server.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)
这两行代码把 server
设置成非阻塞模式。
现在 server
上的 accept
调用就成了非阻塞的了,并且在 accept
成功返回的 client
也会继承 server
的非阻塞特性,所以对 client
的 recv
调用,也是非阻塞的。
在非阻塞模式下调用阻塞函数(accept
和 recv
),如果并没有可用的数据, 原本应阻塞的函数会直接返回 EAGAIN
或 EWOULDBLOCK
错误,程序可以继续执行,不会被操作系统挂起。
EAGAIN
和 EWOULDBLOCK
的错误码通常是相同的。但从兼容上考虑,需要两个错误分别判断。
在 Python
中是以 异常 的形式返回这两个错误的,我用 Exception
就为了捕获这两个错误码,处理方式是什么都不做。
try:
client, clientaddr = server.accept()
clients.add(client)
except Exception as e:
pass
这是处理非阻塞的 accept
调用,如果 accept
成功返回,就把返回的 client
放到一个 clients
的数组中,准备下一步接收 client
的数据。如果没有新的连接,accept
抛出异常,我们直接 pass
到下一步。
现在进程不会被操作系统挂起,将立即执行下面的代码。
for client in clients.copy():
try:
request = client.recv(4096)
client.send(response)
clients.remove(client)
client.close()
except Exception as e:
pass
这段代码并不难理解,for
循环遍历所有已经建立连接的 client
并 尝试 调用 recv
。如果 recv
返回数据,继续调用 send
发送回复,然后把 client
从 clients
中删除并关闭。如果抛出异常就 pass
到下一个 client
。
因为 Python
不允许在遍历一个数组的时候修改数组的内容 ( clients.remove(c)
),所以我用遍历的是 clients.copy()
。
实际上,这个例子中的代码是有严重性能问题的,假定以下情况。
- 没有新的连接请求。
- 现在也没有任何连接,
clients
数组是空的。
当满足这两情况下的时候,服务器负载为 0 。会发生什么?
accept
会立即失败进入for
循环。for
循环 0 次立即进入下一次循环。
服务器逻辑相当于下面的代码。
while True:
pass
进入了一个无意义的循环消耗 CPU
,这个叫做 busy wait
。这种情况在服务器编程中是不允许出现的。
那么如何避免这样的情况发生?方法可能让你大跌眼镜。
用 blocking 的方式。我们躲避了半天的 blocking ,又回来了。
4. I/O Multiplexing IO多路复用
这是电子通信的术语。把它用在非阻塞编程上,也是恰到好处。
4.1 Select 的例子
代码
import os
import fcntl
import socket
import select
response = 'HTTP/1.1 200 OK
Connection: Close
Content-Length: 1
A'
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8080))
server.listen(32)
flags = fcntl.fcntl(server.fileno(), fcntl.F_GETFL)
fcntl.fcntl(server.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)
clients = set([])
while True:
rlist = client.copy()
rlist.add(server)
rlist, wlist, xlist = select.select(rlist, [], [], 10)
for fd in rlist:
if fd == server:
client, clientaddr = server.accept()
clients.add(client)
else:
request = fd.recv(4096)
fd.send(response)
clients.remove(fd)
fd.close()
和第3章中非阻塞的代码很像,不同的是增加了一个 select
的系统调用。现在 server
依然是非阻塞的,但 select
调用是阻塞的,这样就解决了第三章非阻塞面临的 busy wait
的问题。
那么 select
是如何工作的呢?
在第3章中我说过一个非阻塞的 fd
调用 accept
或 recv
时,如果没有可以返回的数据,会立即返回错误 EAGAIN
或 EWOULDBLOCK
。这也是导至第3章服务器 busy wait
的元凶。
但如果把一个非阻塞的 fd
作为参数传到 select
中,在没有数据返回的时候,不会返回错误,而是阻塞在调用 select
的地方。这个阻塞,与第1章和第2章中提到的是一样的。
与第1,2两章不同的是,select
可以同时处理多个 fd
。也就是代码中的 rlist
。如果 rlist
里任何一个 fd
可以被 recv
(如果是 server
,则是 accept
) ,select
函数会返回相应的 fd
,所以在后面的处理中 accept
和 recv
我并没有用 try catch
。
不要小瞧这个可以处理多个 fd
的调用,它可帮了我的大忙,它就是我们的“多路复用”。
下面要介绍一下 select
。
select
函数有 4 个参数
- 第一个参数是
read list
,准备将来执行recv
的fd
数组。 - 第二个参数是
write list
, 准备将来执行send
的fd
数组。 - 第三个参数是
error list
,可能会发生错误的fd
数组。 - 第四个参数是一个超时时间。
前三个参数都是 fd
的数组,分为三个我们感兴趣的事件,可读(read
),可写(write
) 和 错误(error
)。accept
和 recv
均是可读事件。
最后一个参数是在没有任何事件发生的超时时间,在代码中我设置的是 10 秒,如果 10 秒内没有任何新连接或数据,select
会返回空的事件数组。现在的代码里的超时并没有什么用,但如果要实现 timer
的话,这个参数必不可少。
在 C
中的 select
要比 Python
中复杂不少,需要处理比较多的语言和系统的细节。Python
中的封装就好很多,非常易于我们理解,感谢 Python
。
未完待续,下一部分我会介绍 epoll
以及对写事件的处理。如果这一部分有不清楚的,欢迎留言。
转账自:https://www.textarea.com/zhicheng/yong-python-lijie-fuwuqi-moxing-shang-566/