zoukankan      html  css  js  c++  java
  • rabbitmq之rpc

    环境:windows或者Linux,python3.6,rabbitmq3.5
    要求:
    可以对指定机器异步的执行多个命令
    例子:
    >>:run "df -h" --hosts 192.168.3.55 10.4.3.4
    task id: 45334
    >>: check_task 45334
    >>:
    注意,每执行一条命令,即立刻生成一个任务ID,不需等待结果返回,通过命令check_task TASK_ID来得到任务结果

    项目结构:
    rpc_client ---|
    bin ---|
    start_client.py ......启动生成者
    core---|
    main.py ......生产者主程序
    rpc_server ---|
    bin ---|
    start_server.py ......启动消费者
    core---|
    ftp_server.py ......消费者主程序


    用法:
    启动start_client.py,输入命令格式为 run "shell指令或者dos指令" --hosts ip ip
    启动start_server.py
    如果消费者本机ip为输入Ip之一,则回收到指令并返回指令结果
    返回后打印task id
    然后通过指令check_task id 则可以查询返回结果

    producer:

    #!/usr/bin/env python
    # -*-coding:utf-8-*-
    # Author:zh
    import pika
    import random
    
    
    class RpcClient(object):
        # 这个类作为生成者,用来发消息
        def __init__(self, command, ip, corr_id):
            self.response = None   # 消息
            self.command = command  # 命令
            self.queue_name = ip  # 管道名称为IP
            self.corr_id = str(corr_id)   # 随机生成的task id
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
            self.channel = self.connection.channel()
            self.channel.queue_declare(queue=self.queue_name, durable=True)
            result = self.channel.queue_declare(exclusive=True)
            self.callback_queue = result.method.queue
            self.channel.basic_consume(self.on_response,
                                       no_ack=True,
                                       queue=self.callback_queue)
    
        def on_response(self, ch, method, props, body):
            # 收到消息后根据corr_id判断消息是由谁返回
            if self.corr_id == props.correlation_id:
                self.response = body.decode()
    
        def call(self):
            # 发送命令
            self.channel.basic_publish(exchange='',
                                       routing_key=self.queue_name,
                                       properties=pika.BasicProperties(
                                           reply_to=self.callback_queue,
                                           correlation_id=self.corr_id,
                                           delivery_mode=2,
                                       ),
                                       body=self.command)
            while self.response is None:
                self.connection.process_data_events()  # 非阻塞版的start_consume
            return self.response
    
    
    class Command(object):
        # 这个类用来解析命令并传给生产者
        def __init__(self):
            self.used_id = []   # 随机生成ID,避免重复,已生成的ID放入此列表中
            self.id_data = {}   # 将返回结果放入字典中存放
    
        def run(self, *args):
            # 这个方法用来解析命令并传给生产者
            cmd = args[0]
            cmd_str = cmd.split(""")
            try:
                command = cmd_str[1]
                ip_list = cmd_str[2].split()
                ip_list.remove('--hosts')
                corr_id = random.randint(10000, 99999)
                while corr_id in self.used_id:  # 防止corr_id重复
                    corr_id = random.randint(10000, 99999)
                else:
                    self.used_id.append(corr_id)
                return_body = {}
                for i in ip_list:
                    rpc_obj = RpcClient(command, i, corr_id)
                    return_body[i] = rpc_obj.call()
                self.id_data[corr_id] = return_body
                print("task id:", corr_id)
            except IndexError:
                print("输入格式错误")
                self.help()
            except ValueError:
                print("no --hosts")
                self.help()
    
        def check_task(self, *args):
            # 这个方法用来获取ID结果
            cmd = args[0]
            cmd_str = cmd.split()
            if len(cmd_str) == 2:
                corr_id = int(cmd_str[1])
                if corr_id in list(self.id_data.keys()):
                    result = self.id_data.pop(corr_id)
                    for key in result:
                        print("33[32;1m%s:33[0m" % key)
                        for i in eval(result[key]):
                            print(i)
                else:
                    print("no this task id")
    
        @staticmethod
        def help():
            print('''请输入以下命令格式:33[32;1m
    start command: run "command" --hosts ip ip 
    
    get answer: check_task:id33[0m
            ''')
    
    
    def start():
        cmd_obj = Command()
        while True:
            cmd = input("-->:")
            if len(cmd) == 0:
                continue
            cmd_str = cmd.split()
            cmd_title = cmd_str[0]
            if hasattr(cmd_obj, cmd_title):
                func = getattr(cmd_obj, cmd_title)
                func(cmd)
            else:
                cmd_obj.help()
    main

    consumer:

    #!/usr/bin/env python
    # -*-coding:utf-8-*-
    # Author:zh
    import pika
    import subprocess
    import locale
    import codecs
    import platform
    import socket
    import os
    
    
    class RpcServer(object):
        #  这个类用来接收发来本机的消息,管道名为本机IP,将结果发往生成者
        def __init__(self, queue_name):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
            self.channel = self.connection.channel()
            self.channel.queue_declare(queue=queue_name, durable=True)
            self.channel.basic_consume(self.on_request, queue=queue_name)
    
        def start(self):
            # 开始等待消息
            self.channel.start_consuming()
    
        def on_request(self, ch, method, props, body):
            # 收到消息后返回消息结果
            response = self.run_order(body.decode())
            ch.basic_publish(exchange='',
                             routing_key=props.reply_to,
                             properties=pika.BasicProperties(correlation_id=props.correlation_id,
                                                             delivery_mode=2,),
                             body=str(response))
            ch.basic_ack(delivery_tag=method.delivery_tag)
    
        @staticmethod
        def run_order(cmd):
            # 这个方法用来执行命令并返回结果
            cmd_result = subprocess.Popen(args=cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
            return [
                    cmd_result.stdout.read().decode(codecs.lookup(locale.getpreferredencoding()).name),
                    cmd_result.stderr.read().decode(codecs.lookup(locale.getpreferredencoding()).name)
                    ]
    
    if __name__ == "__main__":
        hostname = socket.gethostname()
        system_type = platform.system()
        if system_type == 'Windows':
            ip = socket.gethostbyname(hostname)
        elif system_type == "Linux":
            ip = str(os.popen("LANG=C ifconfig | grep "inet addr" | grep -v "127.0.0.1" | awk -F "
                              "":" '{print $2}' | awk '{print $1}'").readlines()[0]).strip()  # 从Linux获取ip
        prc_obj = RpcServer(ip)
        prc_obj.start()
    View Code
  • 相关阅读:
    selenium + python网页自动化测试环境搭建
    工作总结
    脚本测试总结
    一些知识
    反相器
    递归算法设计
    什么是递归
    CSS3弹性盒模型布局模块
    小方法
    第24章 最佳实践
  • 原文地址:https://www.cnblogs.com/zh-20170913/p/8405937.html
Copyright © 2011-2022 走看看