zoukankan      html  css  js  c++  java
  • Python开发【项目】:RPC异步执行命令(RabbitMQ双向通信)

    RPC异步执行命令
    需求:
    • 利用RibbitMQ进行数据交互
    • 可以对多台服务器进行操作
    • 执行命令后不等待命令的执行结果,而是直接让输入下一条命令,结果出来后自动打印
    • 实现异步操作

    本节涉及最多的还是rabbitmq通信原理知识,要求安装rabbitmq服务

    程序用广播topic模式做更好 

    程序目录结构:

    程序简介:

    复制代码
    # 异步rpc程序
    
    
    ## 1、需求
    - [ ] 利用RibbitMQ进行数据交互
    - [ ] 可以对多台服务器进行操作
    - [ ] 执行命令后不等待命令的执行结果,而是直接让输入下一条命令,结果出来后自动打印
    - [ ] 实现异步操作
    
    ## 备注
    
    - [ ] RabbitMQ队列名:
                        ①执行命令时,队列名为服务器端的IP
                        ②查询数据时,用的是回调时随机生成的callback_queue名
    - [ ] threading多线程:
                        实现命令执行后不等待执行结果,依然可以输入新的指令
    
    - [ ] 执行命令格式:
                     -->>run "dir" host 192.168.5.107 127.0.0.1
                            dir     server端要执行的命令
                            host    host后可跟一个或多个可以通过rabbitMQ的服务器地址
    
    - [ ] 查看后台所有的TASK_ID信息:
                     -->>check_all
         显示结果样式:TASK_ID【76786】    HOST【192.168.5.107】    COMMAND【dir】
                      TASK_ID【10307】    HOST【127.0.0.1】    COMMAND【dir】
    
    - [ ] 查看TASK_ID对应的执行结果:
                     -->>check_task 10307
                             10307 为check_all查到的TASK_ID
    复制代码

    程序流程图:

    服务器端:

    复制代码
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    # !/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    
    import pika
    import os
    
    class Server(object):
        def __init__(self,rabbitmq,queue_name):
            self.queue_name = queue_name
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=rabbitmq))
            self.channel = self.connection.channel()
            self.channel.queue_declare(queue=self.queue_name)
    
        def handle(self,command):
            command = command.decode()
            print(command,type(command))
            message = os.popen(command).read()
            if not message:
                message = "Wrong Command"
            return message
    
        def on_request(self,ch, method, props, body):
            response = self.handle(body)
            ch.basic_publish(exchange='',
                             routing_key=props.reply_to,  # 回信息队列名
                             properties=pika.BasicProperties(correlation_id=
                                                             props.correlation_id),
                             body=str(response))
            ch.basic_ack(delivery_tag=method.delivery_tag)
    
        def start(self):
            self.channel.basic_consume(self.on_request,
                                       queue=self.queue_name)
    
            print(" [x] Awaiting RPC requests")
            self.channel.start_consuming()
    
    
    if __name__ == "__main__":
        rabbitmq = "localhost"      #rabbitmq服务器地址
        queue_name = "192.168.20.22"    #queue_name为本地ip地址
        server = Server(rabbitmq,queue_name)
        server.start()
    复制代码

    客户端:

    bin目录:

    #!/usr/bin/env python
    #-*- coding:utf-8 -*-
    
    import sys
    import os
    import platform
    
    
    #添加BASE_DIR,添加顶级目录到路径中,方便调用其他目录模块
    if platform.system() == 'Windows':
        print(os.path.abspath(os.path.dirname(__file__)).split('\')[:-1])
        BASE_DIR = '\'.join(os.path.abspath(os.path.dirname(__file__)).split('\')[:-1])
    else:
        BASE_DIR = '/'.join(os.path.abspath(os.path.dirname(__file__)).split('/')[:-1])
    
    
    #加载环境变量
    sys.path.append(BASE_DIR)
    from conf import settings
    from core import main
    
    if __name__ == '__main__':
        obj = main.Handler()
        obj.start()
    

    conf目录:

    #!/usr/bin/env python
    #-*- coding:utf-8 -*-
    
    
    import os
    import sys
    import platform
    
    
    if platform.system() == 'Windows':
        BASE_DIR = '\'.join(os.path.abspath(os.path.dirname(__file__)).split('\')[:-1])
        school_dbpaths = os.path.join(BASE_DIR,'school_db')
    
    else:
        BASE_DIR = '/'.join(os.path.abspath(os.path.dirname(__file__)).split('/')[:-1])
        school_dbpaths =os.path.join(BASE_DIR, 'school_db')
    
    
    #rabbitmq服务地址ip
    RabbitMQ_IP = 'localhost'
    

    core目录

    from 作业.rpc_client.conf import settings
    from 作业.rpc_client.modules.client import Client
    import random,time
    import threading
    
    class Handler(object):
        def __init__(self):
            self.information = {}   # 后台进程信息
    
        def check_all(self,*args):
            '''查看所有task_id信息'''
            time.sleep(2)
            for key in self.information:
                print("TASK_ID【%s】	HOST【%s】	COMMAND【%s】"%(key,self.information[key][0],
                                                                        self.information[key][1]))
    
        def check_task(self,user_cmd):
            '''查看task_id执行结果'''
            time.sleep(2)
            try:
                task_id = user_cmd.split()[1]
                task_id = int(task_id)
                callback_queue=self.information[task_id][2]
                callback_id=self.information[task_id][3]
                client = Client()
                response = client.get_response(callback_queue, callback_id)
                print(response.decode())
                del self.information[task_id]
    
            except KeyError  as e :
                print("33[31;0mWrong id[%s]33[0m"%e)
            except IndexError as e:
                print("33[31;0mWrong id[%s]33[0m"%e)
    
        def run(self,user_cmd):
            '''执行命令'''
            try:
                time.sleep(2)
                #print("--->>",user_cmd)
                command = user_cmd.split(""")[1]
                hosts = user_cmd.split()[3:]
                for host in hosts:
                    task_id = random.randint(10000, 99999)
                    client = Client()
                    response = client.call(host, command)
                    # print(response)
                    self.information[task_id] = [host, command, response[0],response[1]]
            except IndexError as e:
                print("33[31;0mError:%s33[0m"%e)
    
        def reflect(self,str,user_cmd):
            '''反射'''
            if hasattr(self, str):
                getattr(self, str)(user_cmd)
            # else:
            #     setattr(self, str, self.foo)
            #     getattr(self, str)()
    
        def start(self):
            while True:
                user_cmd = input("->>").strip()
                if not user_cmd:continue
                str = user_cmd.split()[0]
                t1 = threading.Thread(target=self.reflect,args=(str,user_cmd))  #多线程
                t1.start()
    

    modules目录

    import pika
    import uuid
    from 作业.rpc_client.conf import settings
    
    class Client(object):
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=settings.RabbitMQ_IP))
            self.channel = self.connection.channel()
    
        def on_response(self, ch, method, props, body):
            '''获取命令执行结果的回调函数'''
            # print("验证码核对",self.callback_id,props.correlation_id)
            if self.callback_id == props.correlation_id:  # 验证码核对
                self.response = body
            ch.basic_ack(delivery_tag=method.delivery_tag)
    
        def get_response(self,callback_queue,callback_id):
            '''取队列里的值,获取callback_queued的执行结果'''
            self.callback_id = callback_id
            self.response = None
            self.channel.basic_consume(self.on_response,  # 只要收到消息就执行on_response
                                       queue=callback_queue)
            while self.response is None:
                self.connection.process_data_events()  # 非阻塞版的start_consuming
            return self.response
    
        def call(self,queue_name,command):
            '''队列里发送数据'''
            result = self.channel.queue_declare(exclusive=False) #exclusive=False 必须这样写
            self.callback_queue = result.method.queue
            self.corr_id = str(uuid.uuid4())
            # print(self.corr_id)
            self.channel.basic_publish(exchange='',
                                       routing_key=queue_name,
                                       properties=pika.BasicProperties(
                                           reply_to=self.callback_queue,  # 发送返回信息的队列name
                                           correlation_id=self.corr_id,  # 发送uuid 相当于验证码
                                       ),
                                       body=command)
    
            return self.callback_queue,self.corr_id
    

    运行示例图

  • 相关阅读:
    javaee
    前后台页面跳转及参数传递
    easyu几个常见问题
    利用easyUI填充表格数据
    easyUI数据转换为不同级别的数据
    Java数据结构漫谈-Vector
    RxJava漫谈-RxAndroid使用
    Java数据结构漫谈-Stack
    Java性能漫谈-数组复制之System.arraycopy
    Java数据结构漫谈-LinkedList
  • 原文地址:https://www.cnblogs.com/fuyuteng/p/9283343.html
Copyright © 2011-2022 走看看