一、作业需求
1、可以对指定机器异步的执行多个命令
例子:
请输入操作指令>>>:run ipconfig --host 127.0.0.0
in the call tack_id:[24869c59-bdc3-4cfc-9a00-313e257d9f58] cmd:[ipconfig] host:[127.0.0.0]
>请输入操作指令>>>:check_all
查看所有任务编号:
1 24869c59-bdc3-4cfc-9a00-313e257d9f58
2 f2d05df2-05b7-4059-93cc-de7c80aee5af
请输入操作指令>>>:check_task 24869c59-bdc3-4cfc-9a00-313e257d9f58
查看任务[24869c59-bdc3-4cfc-9a00-313e257d9f58]返回结果:
Windows IP 配置
以太网适配器 本地连接:
连接特定的 DNS 后缀 . . . . . . . :
本地链接 IPv6 地址. . . . . . . . : fe80::cddb:da51:7e58:2515%11
IPv4 地址 . . . . . . . . . . . . : 192.168.1.119
二、readme
一、作业需求: 1、可以对指定机器异步的执行多个命令 例子: 请输入操作指令>>>:run ipconfig --host 127.0.0.0 in the call tack_id:[24869c59-bdc3-4cfc-9a00-313e257d9f58] cmd:[ipconfig] host:[127.0.0.0] 请输入操作指令>>>:run dir --host 127.0.0.0 in the call tack_id:[f2d05df2-05b7-4059-93cc-de7c80aee5af] cmd:[dir] host:[127.0.0.0] >请输入操作指令>>>:check_all 查看所有任务编号: 1 24869c59-bdc3-4cfc-9a00-313e257d9f58 2 f2d05df2-05b7-4059-93cc-de7c80aee5af 请输入操作指令>>>:chedk_task 24869c59-bdc3-4cfc-9a00-313e257d9f58 请输入有效操作指令 请输入操作指令>>>:check_task 24869c59-bdc3-4cfc-9a00-313e257d9f58 注意,每执行一条命令,即立刻生成一个任务ID,不需等待结果返回,通过命令check_task TASK_ID来得到任务结果 二、博客地址:http://www.cnblogs.com/catepython/p/9051490.html 三、运行环境 操作系统:Win10 Python:3.6.4rcl Pycharm:2017.3.4 四 、具体实现 思路解析: 分析需求其实可以发现,输入命令为消费者,执行命令是生产者,参照RabbitMQ的官方文档rpc部分和课上的代码就可以了。 1. 使用RabbitMQ_RPC, Queen使用主机IP 2. 消费者输入命令,分割字段,获取run,check_task,check_task_all,host等信息,传给生产者。 3. 生产者执行命令处理windows/linux不同消息发送情况windows decode(‘gbk’) linux decode('utf-8'),返回结果。 4. 消费者将结果存成字典,查询结果后删除。 五、测试流程 1、在本次作业中处理了各种操作指令的异常处理 如:数组越界、使用check_task操作时无效任务ID处理 2、对用户输入的格式就行了正则 如:IP地址格式的判断 3、同时能对多个服务主机发送并接收相应的数据信息 六、备注 1、完成本次作业看了5遍视频,百度了所有大神们的心得代码,终于理清了整个RPC框架和原理
三、流程图
四、目录结构图
五、核心代码
conf目录
#-*-coding:utf-8 -*- # Author: D.Gray import os BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) LOCAL_HOST = ('localhost')
core主逻辑目录
#-*-coding:utf-8 -*- # Author: D.Gray import pika,uuid,time,re from conf import setting class Producer(object): def __init__(self): self.connaction = pika.BlockingConnection(pika.ConnectionParameters(setting.LOCAL_HOST)) self.channel = self.connaction.channel() self.result = self.channel.queue_declare(exclusive=True) #生产一个随机queue self.callback_queue = self.result.method.queue # 声明一个queue,一收到消息就调用on_response函数 self.channel.basic_consume(self.on_response,no_ack=True,queue=self.callback_queue) def on_response(self,ch,method,props,body): ''' 处理收到消息函数 :param ch: :param method: :param props: :param body: :return: ''' if self.corr_id == props.correlation_id: self.response = body def call(self,cmd,host): ''' 发送操作指令至服务端函数 :param cmd: :param host: :return: ''' self.response = None self.corr_id = str(uuid.uuid4()) #把操作指令发送给服务端,并把与服务端约定好一个随机queue一同传过去 self.channel.basic_publish(exchange='',routing_key=host, properties=pika.BasicProperties( reply_to=self.callback_queue, #与服务端约定好一个随机queue进行第二次数据交互 correlation_id = self.corr_id ), body=str(cmd) ) while self.response is None: self.connaction.process_data_events() #非阻塞版的start_consume # print('no message...') # time.sleep(1) task_id = self.corr_id res = self.response.decode() tmp_dict[task_id] = res print('in the call tack_id:[%s] cmd:[%s] host:[%s]'%(task_id,cmd,host)) return task_id,res def help(): ''' 帮助函数 :return: ''' meg = '''