1.在本章开始之前,需要先理解同步与异步,阻塞与非阻塞的区别:
“阻塞”与"非阻塞"与"同步"与“异步"不能简单的从字面理解,提供一个从分布式系统角度的回答。
1.同步与异步
同步和异步关注的是消息通信机制 (synchronous communication/ asynchronous communication)
所谓同步,就是在发出一个*调用*时,在没有得到结果之前,该*调用*就不返回。但是一旦调用返回,就得到返回值了。
换句话说,就是由*调用者*主动等待这个*调用*的结果。
而异步则是相反,*调用*在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在*调用*发出后,*被调用者*通过状态、通知来通知调用者,或通过回调函数处理这个调用。
典型的异步编程模型比如Node.js
举个通俗的例子:
你打电话问书店老板有没有《分布式系统》这本书,如果是同步通信机制,书店老板会说,你稍等,”我查一下",然后开始查啊查,等查好了(可能是5秒,也可能是一天)告诉你结果(返回结果)。
而异步通信机制,书店老板直接告诉你我查一下啊,查好了打电话给你,然后直接挂电话了(不返回结果)。然后查好了,他会主动打电话给你。在这里老板通过“回电”这种方式来回调。
2. 阻塞与非阻塞
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.
阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。
非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。
还是上面的例子,
你打电话问书店老板有没有《分布式系统》这本书,你如果是阻塞式调用,你会一直把自己“挂起”,直到得到这本书有没有的结果,如果是非阻塞式调用,你不管老板有没有告诉你,你自己先一边去玩了, 当然你也要偶尔过几分钟check一下老板有没有返回结果。
在这里阻塞与非阻塞与是否同步异步无关。跟老板通过什么方式回答你结果无关。
2.在套接字服务器程序中使用ForkingMixIn:
# -*- coding: UTF-8 -*-
# 编写一个异步Python套接字服务器程序,服务器处理客户端发出的请求时不能阻塞,
# 因此要找一种机制来单独处理每个客户端,
# Python2.7中的SocketServer模块提供了两个实用类:ForkingMixIn和ThreadingMixIn,
# ForkingMixIn会为每一个客户端请求派生一个新进程
# !usr/bin/env python
# Python Network Programming Cookbook --Chapter -1
# This program is optimized for Python 2.7
# It may run on any other version with/without modifications
import os
import socket
import threading
import SocketServer
SERVER_HOST='localhost'
SERVER_PORT=0
BUF_SIZE=1024
ECHO_MSG='Hello echo server'
class ForkingClient():
"""一个forKing服务的客户端测试"""
def __init__(self,ip,port):
self.sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.sock.connect((ip,port))
def run(self):
"""客户端和服务器交互"""
current_process_id=os.getpid()
print 'PID%s 发送回显消息到服务器:"%s"' %(current_process_id,ECHO_MSG)
sent_data_length=self.sock.send(ECHO_MSG)
print "发送:%d characters ,so far....." %sent_data_length
response=self.sock.recv(BUF_SIZE)
print "PID%s 收到 :%s" %(current_process_id,response[5:])
def shutdown(self):
"""清除客户端的套接字"""
self.sock.close()
class ForkingServerRequestHandler(SocketServer.BaseRequestHandler):
def handle(self):
data=self.request.recv(BUF_SIZE)
current_process_id=os.getppid()
response='%s :%s' %(current_process_id,data)
print "服务器发送回应[当前的进程ID:data]=[%s]" %response
self.request.send(response)
return
class ForkingServer(SocketServer.ForkingMixIn,SocketServer.TCPServer):
"""没什么可以添加的这里,继承父类中必须的属性"""
pass
def main():
server=ForkingServer((SERVER_HOST,SERVER_PORT),ForkingServerRequestHandler)
ip,port=server.server_address
server_thread=threading.Thread(target=server.serve_forever)
server_thread.setDaemon(True)
server_thread.start()
print '服务器loop运行PID:%s' %os.getpid()
client1=ForkingClient(ip,port)
client1.run()
client2=ForkingClient(ip,port)
client2.run()
server.shutdown()
client1.shutdown()
client2.shutdown()
server.socket.close()
if __name__=='__main__':
main()
程序由3部分组成:
建立一个 SocketServer 所要做的大部分工作是定义一个请求处理( request handler )类。
它是 SocktServer 模块里 BaseRequestHandler 类的子类,并且每一个 request handler 对象在客户端连接到服务端时处理着一个客户端的请求。
这是在请求处理的 handle 方法里实现的。
无论什么时候连接到这个服务端, TCPServer 类将传递正确的变量来建立一个新的 RequestHandler ,并调用 handle 方法来处理请求。
下面是服务器类和main主方法:
process_request会继承自ThreadingMixIn或者ForkingMixIn,对每个request新建线程,然后由线程调用finish_request
运行结果: