zoukankan      html  css  js  c++  java
  • python作业(第十一周)基于RabbitMQ rpc实现的主机管理

    作业需求:

    可以对指定机器异步的执行多个命令

    例子:

    >>:run "df -h" --hosts 192.168.3.55 10.4.3.4

    task id: 45334

    >>: check_task 45334

    >>:

    注意,每执行一条命令,即立刻生成一个任务ID,不需等待结果返回,通过命令check_task TASK_ID来得到任务结果

    思路解析:

    分析需求其实可以发现,输入命令为消费者,执行命令是生产者,参照RabbitMQ的官方文档rpc部分和课上的代码就可以了。

    1. 使用RabbitMQ_RPC, Queen使用主机IP

    2. 消费者输入命令,分割字段,获取run,check_task,check_task_all,host等信息,传给生产者。

    3. 生产者执行命令处理windows/linux不同消息发送情况windows decode(‘gbk’) linux decode('utf-8'),返回结果。

    4. 消费者将结果存成字典,查询结果后删除。 

    关于疑问在测试的过程中发现

      while self.response is None:
                self.connection.process_data_events()

    在这段中如果没有消息返回就一直处于死循环也就是说,如果生产者挂掉一台,那我就会卡住,查看官方文档,对这个简单RPC实现也是延伸了下这个问题。

    1. 如果没有服务器运行,客户应该如何应对?
    2. 客户端是否应该对RPC有某种超时?
    3. 如果服务器发生故障并引发异常,是否应将其转发给客户端?
    4. 在处理之前防止无效的传入消息(例如检查边界)。

    思维导图:

    核心代码:

    消费者: 

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    # Time:2017/12/6 15:52
    __Author__ = 'Sean Yao'
    import pika
    import uuid
    
    class CommandToRabbitmq(object):
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
    
            self.channel = self.connection.channel()
    
            result = self.channel.queue_declare(exclusive=True)
            self.callback_queue = result.method.queue
    
            self.channel.basic_consume(self.on_response, no_ack=True,
                                       queue=self.callback_queue)
    
        def on_response(self, ch, method, props, body):
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self, command, host):
            self.response = None
            self.corr_id = str(uuid.uuid4())
            ack = self.channel.basic_publish(exchange='',
                                             routing_key=host,
                                             properties=pika.BasicProperties(
                                                 reply_to=self.callback_queue,
                                                 correlation_id=self.corr_id),
                                             body=str(command))
            while self.response is None:
                # 等待消息
                self.connection.process_data_events()
    
            task_id = self.corr_id
            res = self.response.decode()
            tmp_dict[task_id] = res
            print('task_id: %s host: %s cmd: %s ' % (self.corr_id, host, command))
            return self.corr_id, self.response.decode()
    
    def help():
        print('Usage: run "df -h" --hosts 127.0.0.1 192.168.84.66 ')
        print('       check_task 54385061-aa3a-400f-8a21-2be368e66493 ')
        print('       check_task_all')
    
    
    def start(command_input):
        command_list = command_input.split()
        if command_list[0] == 'check_task':
            try:
                print(tmp_dict[command_list[1]])
                del tmp_dict[command_list[1]]
            except IndexError:
                help()
        elif command_list[0] == 'run':
            # 获取命令主机,并循环执行
            try:
                ip_hosts_obj = command_input.split('run')
                hosts_obj = (ip_hosts_obj[1].split('--hosts'))
                hosts = hosts_obj[1].strip().split()
                command = command_input.split(""")[1]
                for host in hosts:
                    try:
                        command_rpc.call(command, host)
                    except TypeError and AssertionError:
                        break
            except IndexError:
                print('-bash: %s command not found' % command_input)
                help()
        elif command_list[0] == 'check_task_all':
            for index, key in enumerate(tmp_dict.keys()):
                print(index, 'task_id: %s' % key)
        elif command_list[0] == 'help':
            help()
        else:
            print('-bash: %s command not found' % command_input)
            help()
    
    
    command_rpc = CommandToRabbitmq()
    exit_flag = True
    tmp_dict = {}
    help()
    while exit_flag:
        command_input = input('请输入命令>>>:').strip()
        if len(command_input) == 0:
            continue
        else:
            start(command_input)
    View Code

    生产者:

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    # Time:2017/12/6 15:52
    __Author__ = 'Sean Yao'
    import pika
    import time
    import subprocess
    import platform
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    
    # rabbitmq 设有权限的连接
    # connection = pika.BlockingConnection(pika.ConnectionParameters(
    # host='192.168.1.105',credentials=pika.PlainCredentials('admin', 'admin')))
    
    channel = connection.channel()
    channel.queue_declare(queue='127.0.0.1')
    os_res = platform.system()
    
    # def command(cmd, task_id):
    def command(cmd):
        if os_res == 'Windows':
            res = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            msg = res.stdout.read().decode('gbk')
            if len(msg) == 0:
                msg = res.stderr.read().decode('gbk')
            print(msg)
            return msg
    
        else:
            res = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            print(res)
            msg = res.stdout.read().decode()
            if len(msg) == 0:
                msg = res.stderr.read().decode()
            return msg
    
    def on_request(ch, method, props, body):
        cmd = body.decode()
        respone = command(cmd)
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(correlation_id=props.correlation_id),
                         body=respone)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(on_request, queue='127.0.0.1')
    print(" [x] Awaiting RPC requests")
    channel.start_consuming()
    View Code

     

    程序测试样图:

     

     

  • 相关阅读:
    CSUFT 1002 Robot Navigation
    CSUFT 1003 All Your Base
    Uva 1599 最佳路径
    Uva 10129 单词
    欧拉回路
    Uva 10305 给任务排序
    uva 816 Abbott的复仇
    Uva 1103 古代象形文字
    Uva 10118 免费糖果
    Uva 725 除法
  • 原文地址:https://www.cnblogs.com/sean-yao/p/8110006.html
Copyright © 2011-2022 走看看