zoukankan      html  css  js  c++  java
  • Python 程序:基于RabbitMQ实现主机管理

    Python 程序:基于RabbitMQ实现主机管理


    1、需求

    2、代码

    3、测试样图


    一、需求

    1、可以异步的执行多个命令

    2、对多台机器

    举例

    >>:run "df -h" --hosts 192.168.3.55 10.4.3.4 
    task id: 45334
    >>: check_task 45334 
    >>:

    二、代码

     1 import random
     2 import pika
     3 
     4 class rpc_cilent(object):
     5     def connect(self):
     6         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
     7     def connect_1(self):
     8         self.credentials = pika.PlainCredentials('zz', '123456')
     9         self.connection = pika.BlockingConnection(pika.ConnectionParameters(
    10         '192.168.43.165',5672,'/',self.credentials))
    11     def on_response(self, ch, method, props, body):
    12         if self.corr_id == props.correlation_id:
    13             self.response = body
    14     def call(self, command, host,tmp_dict):
    15         if host == "127.0.0.1":
    16             self.connect()
    17         elif host == "192.168.43.165":
    18             self.connect_1()
    19         self.channel = self.connection.channel()
    20         result = self.channel.queue_declare(exclusive=True)
    21         self.callback_queue = result.method.queue
    22         self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)
    23         self.response = None
    24         self.corr_id = str(random.randint(10000, 99999))
    25         ack = self.channel.basic_publish(exchange='',
    26                                          routing_key= "127.0.0.1",
    27                                          properties=pika.BasicProperties(
    28                                              reply_to=self.callback_queue,
    29                                              correlation_id=self.corr_id),
    30                                          body=str(command))
    31         while self.response is None:
    32             self.connection.process_data_events()
    33         task_id = self.corr_id
    34         res = self.response.decode()
    35         tmp_dict[task_id] = res
    36         print('33[42;0mtask_id: %s host: %s cmd: %s33[0m' % (self.corr_id, host, command))
    37 class rpc_start(object):
    38     def __init__(self):
    39         self.tmp_dict = {}
    40         self.start()
    41     def check_all(self,*args):
    42         for index, key in enumerate(self.tmp_dict.keys()):
    43             print(index, '33[42;0mtask_id: %s33[0m' % key)
    44     def check_task(self,user_cmd):
    45         try:
    46             command_list = user_cmd.split()
    47             print(self.tmp_dict[command_list[1]])
    48             del self.tmp_dict[command_list[1]]
    49         except IndexError:
    50             help()
    51     def run(self,user_cmd):
    52          try:
    53             hosts_obj = ( user_cmd.split('hosts'))
    54             hosts = hosts_obj[1].strip().split()
    55             command = user_cmd.split(""")[1]
    56             for host in hosts:
    57                 try:
    58                     if host == "127.0.0.1":
    59                         rpc_cilent.call(command, host,self.tmp_dict)
    60                     elif host == "192.168.43.165":
    61                         rpc_cilent.call(command, host,self.tmp_dict)
    62                     else:
    63                         print("33[41;0mno host:%s33[0m"%host)
    64                 except TypeError and AssertionError:
    65                     break
    66          except IndexError:
    67             print("33[31;0mcommand not found33[0m")
    68             self.help()
    69     def help(self):
    70         print('33[34;0mUsage: run "df -h" hosts 127.0.0.1 192.168.43.165 33[0m')
    71         print('33[34;0m       check_task 9712833[0m')
    72         print('33[34;0m       check_all33[0m')
    73     def start(self):
    74         self.help()
    75         while True:
    76             user_cmd = input("->>").strip()
    77             if not user_cmd:continue
    78             self.cmd = user_cmd.split()[0]
    79             if hasattr(self, self.cmd):
    80                 getattr(self, self.cmd)(user_cmd)
    81             else:
    82                 print("33[31;0mcommand not found!33[0m")
    83                 self.help()
    84 
    85 rpc_cilent = rpc_cilent()
    86 start = rpc_start()
    rpc_client
     1 import pika
     2 import subprocess
     3 
     4 class rpc_server(object):
     5     def __init__(self,host,queue):
     6         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=host))
     7         self.channel = self.connection.channel()
     8         self.channel.queue_declare(queue=queue)
     9         self.channel.basic_qos(prefetch_count=1)
    10         self.channel.basic_consume(self.on_request, queue=queue)
    11         print(" [x] Awaiting RPC requests")
    12         self.channel.start_consuming()
    13     def command(self,cmd):
    14         res = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    15         msg = res.stdout.read().decode('gbk')
    16         if len(msg) == 0:
    17             msg = res.stderr.read().decode('gbk')
    18         print(msg)
    19         return msg
    20     def on_request(self,ch, method, props, body):
    21         cmd = body.decode()
    22         print(cmd)
    23         respone = self.command(cmd)
    24         ch.basic_publish(exchange='',
    25                          routing_key=props.reply_to,
    26                          properties=pika.BasicProperties(correlation_id=props.correlation_id),
    27                          body=respone)
    28         ch.basic_ack(delivery_tag=method.delivery_tag)
    29 
    30 server = rpc_server("localhost","127.0.0.1")
    rpc_server

    三、测试样图

    启动客户端

    windows端服务器启动

    linux端服务器启动

    client端测试:

  • 相关阅读:
    微软一站式示例代码库 11月小结
    Linux服务器使用SSH的命令 [转]
    简明 Vim 练级攻略 [转]
    如何在Windows下使用LAPACK和ARPACK [转]
    使用setuptools自动安装python模块 [转]
    VS2010下GSL的配置 [转]
    poj3255 Roadblocks ***
    协方差矩阵的详细说明 [转]
    VC环境下LIB引用问题(LNK1104) [转]
    Dreamweaver CS5: "Configuration error"
  • 原文地址:https://www.cnblogs.com/hy0822/p/9284507.html
Copyright © 2011-2022 走看看