在这个demo中,主要是使用了Tornado中异步的TCP client和server来实现一个简单的echo效果(即客户端发送的message会从server端返回到client)。代码的github链接点这里。
1 Server端代码分析
1 import logging 2 from tornado.ioloop import IOLoop 3 from tornado import gen 4 from tornado.iostream import StreamClosedError 5 from tornado.tcpserver import TCPServer 6 from tornado.options import options, define 7 8 define("port", default=9888, help="TCP port to listen on") 9 logger = logging.getLogger(__name__) 10 11 12 class EchoServer(TCPServer): 13 @gen.coroutine 14 def handle_stream(self, stream, address): 15 while True: 16 try: 17 data = yield stream.read_until(b" ") 18 logger.info("Received bytes: %s", data) 19 if not data.endswith(b" "): 20 data = data + b" " 21 yield stream.write(data) 22 except StreamClosedError: 23 logger.warning("Lost client at host %s", address[0]) 24 break 25 except Exception as e: 26 print(e) 27 28 29 if __name__ == "__main__": 30 options.parse_command_line() 31 server = EchoServer() 32 server.listen(options.port) 33 logger.info("Listening on TCP port %d", options.port) 34 IOLoop.current().start()
涉及到引入的模块及作用:
import logging //用来记录日志
from tornado.ioloop import IOLoop
from tornado import gen //实现异步
from tornado.iostream import StreamClosedError //处理iostream
from tornado.tcpserver import TCPServer // 非阻塞单线程的TCP server及其相关哦功能
from tornado.options import options, define // options参数相关
server端代码并不复杂,首先定义了默认的监听端口,并且生成了一个logger实例。(logging用法点这里。)
define("port", default=9888, help="TCP port to listen on") logger = logging.getLogger(__name__)
然后创建EchoServer类如下。这个类继承TCPServer(更多参考)。里面重写了handle_stream方法。handle_stream接收了stream和address两个参数,stream是一个iostream的object,address是client端地址。逻辑比较清晰,使用try, except来捕获io错误,如果没有错误的话,会读取stream内容直到遇到换行停止。读取到的data会写回到client端,通过write(data)。
handle_stream方法是用了@gen.coroutine装饰器和yield来实现异步读取写回iostream。
class EchoServer(TCPServer): @gen.coroutine def handle_stream(self, stream, address): while True: try: data = yield stream.read_until(b" ") logger.info("Received bytes: %s", data) if not data.endswith(b" "): data = data + b" " yield stream.write(data) except StreamClosedError: logger.warning("Lost client at host %s", address[0]) break except Exception as e: print(e)
最后在main部分,生成一个EchoServer实例并监听定义的端口,然后启动事件的ioloop。
options.parse_command_line() server = EchoServer() server.listen(options.port) logger.info("Listening on TCP port %d", options.port) IOLoop.current().start()
2 Client端代码分析
1 from __future__ import print_function 2 from tornado.ioloop import IOLoop 3 from tornado import gen 4 from tornado.tcpclient import TCPClient 5 from tornado.options import options, define 6 7 define("host", default="localhost", help="TCP server host") 8 define("port", default=9888, help="TCP port to connect to") 9 define("message", default="ping", help="Message to send") 10 11 12 @gen.coroutine 13 def send_message(): 14 stream = yield TCPClient().connect(options.host, options.port) 15 yield stream.write((options.message + " ").encode()) 16 print("Sent to server:", options.message) 17 reply = yield stream.read_until(b" ") 18 print("Response from server:", reply.decode().strip()) 19 20 21 if __name__ == "__main__": 22 options.parse_command_line() 23 IOLoop.current().run_sync(send_message)
client 端首先定义了3个option, host,port,以及message,分别为要连接的服务端的host ip, 端口和要发送的message.
send_message用来向server端发送和接收数据。同样这里使用@gen.coroutine和yield来实现异步。
@gen.coroutine def send_message(): stream = yield TCPClient().connect(options.host, options.port) yield stream.write((options.message + " ").encode()) print("Sent to server:", options.message) reply = yield stream.read_until(b" ") print("Response from server:", reply.decode().strip())
3 运行效果
server端运行后,可以使用运行client.py发送消息,发送完成后连接会端口。也可以使用telnet保持连接,交互式的发送数据给server端。