项目中异步调用 ping 和 nmap 实现对目标 ip 和所在网关的探测
Subprocess.STREAM 不用担心进程返回数据过大造成的死锁, Subprocess.PIPE 会有这个问题.
import tornado.gen from tornado.process import Subprocess @tornado.gen.coroutine def run_command(command): """run command""" process = Subprocess( [command], stdout=Subprocess.STREAM, stderr=Subprocess.STREAM, shell=True ) out, err = yield [process.stdout.read_until_close(), process.stderr.read_until_close()] raise tornado.gen.Return((out, err)) class NmapHandler(tornado.web.RequestHandler): """handle nmap check request""" @tornado.gen.coroutine def get(self): ip = self.get_argument("ip", None) if not ip: self.write(json.dumps({})) raise tornado.gen.Return(None) nmap_resp, _ = yield run_command(nmap % ip) self.write(json.dumps( { "ip": ip, "nmap_resp": nmap_resp } ))
前一阵想到一个问题, run_command 如何进行异常处理. 原则上, 异常除了本地存储, 还应该上报调用者.
子进程执行的命令是固定的, 出现异常只会有两种情况, 第一, 创建子进程失败, 触发 OSError, 第二, 子进程中执行的 shell 命令失败, 报错信息重定向到stderr.
所以, 暂时的处理是捕捉 OSError.
使用非阻塞线程池, 调用 paramiko 来分发检测任务.
from concurrent.futures import ThreadPoolExecutor from tornado.concurrent import run_on_executor class FailureHandler(tornado.web.RequestHandler): """handle server check request""" executor = ThreadPoolExecutor(100) @run_on_executor def get(self): ip = self.get_argument("ip", None) if not ip: self.write(json.dumps({})) raise tornado.gen.Return(None) resp = distributer(ip) if resp: resp = 0 else: resp = 1 self.write(json.dumps( { "ip": ip, "failure_rslt": resp } ))