zoukankan      html  css  js  c++  java
  • 【项目】:RPC异步执行命令(RabbitMQ双向通信)

    RPC异步执行命令
    需求:
    • 利用RibbitMQ进行数据交互
    • 可以对多台服务器进行操作
    • 执行命令后不等待命令的执行结果,而是直接让输入下一条命令,结果出来后自动打印
    • 实现异步操作

    不懂rpc的请移步http://www.cnblogs.com/lianzhilei/p/5977545.html(最下边)

    本节涉及最多的还是rabbitmq通信原理知识,要求安装rabbitmq服务

    程序用广播topic模式做更好 

    程序目录结构:

    程序简介:

    # 异步rpc程序
    
    # 博客地址
    [第11天博客地址](http://www.cnblogs.com/lianzhilei/p/5970176.html)
                   (http://www.cnblogs.com/lianzhilei/p/5970176.html)
    
    ## 1、需求
    - [ ] 利用RibbitMQ进行数据交互
    - [ ] 可以对多台服务器进行操作
    - [ ] 执行命令后不等待命令的执行结果,而是直接让输入下一条命令,结果出来后自动打印
    - [ ] 实现异步操作
    
    ## 备注
    
    - [ ] RabbitMQ队列名:
                        ①执行命令时,队列名为服务器端的IP
                        ②查询数据时,用的是回调时随机生成的callback_queue名
    - [ ] threading多线程:
                        实现命令执行后不等待执行结果,依然可以输入新的指令
    
    - [ ] 执行命令格式:
                     -->>run "dir" host 192.168.20.22 192.168.20.23
                            dir     server端要执行的命令
                            host    host后可跟一个或多个可以通过rabbitMQ的服务器地址
    
    - [ ] 查看后台所有的TASK_ID信息:
                     -->>check_all
         显示结果样式:TASK_ID【76786】    HOST【192.168.20.22】    COMMAND【dir】
                      TASK_ID【10307】    HOST【192.168.20.23】    COMMAND【dir】
    
    - [ ] 查看TASK_ID对应的执行结果:
                     -->>check_task 10307
                             10307 为check_all查到的TASK_ID
    README.md

    程序流程图:

    服务器端:

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    #-Author-Lian
    
    # !/usr/bin/env python
    # -*- coding:utf-8 -*-
    # -Author-Lian
    
    import pika
    import os
    
    class Server(object):
        def __init__(self,rabbitmq,queue_name):
            self.queue_name = queue_name
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=rabbitmq))
            self.channel = self.connection.channel()
            self.channel.queue_declare(queue=self.queue_name)
    
        def handle(self,command):
            command = command.decode()
            print(command,type(command))
            message = os.popen(command).read()
            if not message:
                message = "Wrong Command"
            return message
    
        def on_request(self,ch, method, props, body):
            response = self.handle(body)
            ch.basic_publish(exchange='',
                             routing_key=props.reply_to,  # 回信息队列名
                             properties=pika.BasicProperties(correlation_id=
                                                             props.correlation_id),
                             body=str(response))
            ch.basic_ack(delivery_tag=method.delivery_tag)
    
        def start(self):
            self.channel.basic_consume(self.on_request,
                                       queue=self.queue_name)
    
            print(" [x] Awaiting RPC requests")
            self.channel.start_consuming()
    
    
    if __name__ == "__main__":
        rabbitmq = "localhost"      #rabbitmq服务器地址
        queue_name = "192.168.20.22"    #queue_name为本地ip地址
        server = Server(rabbitmq,queue_name)
        server.start()
    server.py

    客户端:

    bin目录:

    #!/usr/bin/env python
    #-*- coding:utf-8 -*-
    
    import sys
    import os
    import platform
    
    
    #添加BASE_DIR,添加顶级目录到路径中,方便调用其他目录模块
    if platform.system() == 'Windows':
        print(os.path.abspath(os.path.dirname(__file__)).split('\')[:-1])
        BASE_DIR = '\'.join(os.path.abspath(os.path.dirname(__file__)).split('\')[:-1])
    else:
        BASE_DIR = '/'.join(os.path.abspath(os.path.dirname(__file__)).split('/')[:-1])
    
    
    #加载环境变量
    sys.path.append(BASE_DIR)
    from conf import settings
    from core import main
    
    if __name__ == '__main__':
        obj = main.Handler()
        obj.start()
    start.py

    conf目录:

    #!/usr/bin/env python
    #-*- coding:utf-8 -*-
    __author__ = 'luotianshuai'
    
    import os
    import sys
    import platform
    
    
    if platform.system() == 'Windows':
        BASE_DIR = '\'.join(os.path.abspath(os.path.dirname(__file__)).split('\')[:-1])
        school_dbpaths = os.path.join(BASE_DIR,'school_db')
    
    else:
        BASE_DIR = '/'.join(os.path.abspath(os.path.dirname(__file__)).split('/')[:-1])
        school_dbpaths =os.path.join(BASE_DIR, 'school_db')
    
    
    #rabbitmq服务地址ip
    RabbitMQ_IP = 'localhost'
    settings.py

    core目录

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    #-Author-Lian
    
    from conf import settings
    from modules.client import Client
    import random,time
    import threading
    
    class Handler(object):
        def __init__(self):
            self.information = {}   # 后台进程信息
    
        def check_all(self,*args):
            '''查看所有task_id信息'''
            time.sleep(2)
            for key in self.information:
                print("TASK_ID【%s】	HOST【%s】	COMMAND【%s】"%(key,self.information[key][0],
                                                                        self.information[key][1]))
    
        def check_task(self,user_cmd):
            '''查看task_id执行结果'''
            time.sleep(2)
            try:
                task_id = user_cmd.split()[1]
                task_id = int(task_id)
                callback_queue=self.information[task_id][2]
                callback_id=self.information[task_id][3]
                client = Client()
                response = client.get_response(callback_queue, callback_id)
                print(response.decode())
                del self.information[task_id]
    
            except KeyError  as e :
                print("33[31;0mWrong id[%s]33[0m"%e)
            except IndexError as e:
                print("33[31;0mWrong id[%s]33[0m"%e)
    
        def run(self,user_cmd):
            '''执行命令'''
            try:
                time.sleep(2)
                #print("--->>",user_cmd)
                command = user_cmd.split(""")[1]
                hosts = user_cmd.split()[3:]
                for host in hosts:
                    task_id = random.randint(10000, 99999)
                    client = Client()
                    response = client.call(host, command)
                    # print(response)
                    self.information[task_id] = [host, command, response[0],response[1]]
            except IndexError as e:
                print("33[31;0mError:%s33[0m"%e)
    
        def reflect(self,str,user_cmd):
            '''反射'''
            if hasattr(self, str):
                getattr(self, str)(user_cmd)
            # else:
            #     setattr(self, str, self.foo)
            #     getattr(self, str)()
    
        def start(self):
            while True:
                user_cmd = input("->>").strip()
                if not user_cmd:continue
                str = user_cmd.split()[0]
                t1 = threading.Thread(target=self.reflect,args=(str,user_cmd))  #多线程
                t1.start()
    main.py

    modules目录

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    # -Author-Lian
    
    import pika
    import uuid
    from conf import settings
    
    class Client(object):
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=settings.RabbitMQ_IP))
            self.channel = self.connection.channel()
    
        def on_response(self, ch, method, props, body):
            '''获取命令执行结果的回调函数'''
            # print("验证码核对",self.callback_id,props.correlation_id)
            if self.callback_id == props.correlation_id:  # 验证码核对
                self.response = body
            ch.basic_ack(delivery_tag=method.delivery_tag)
    
        def get_response(self,callback_queue,callback_id):
            '''取队列里的值,获取callback_queued的执行结果'''
            self.callback_id = callback_id
            self.response = None
            self.channel.basic_consume(self.on_response,  # 只要收到消息就执行on_response
                                       queue=callback_queue)
            while self.response is None:
                self.connection.process_data_events()  # 非阻塞版的start_consuming
            return self.response
    
        def call(self,queue_name,command):
            '''队列里发送数据'''
            result = self.channel.queue_declare(exclusive=False) #exclusive=False 必须这样写
            self.callback_queue = result.method.queue
            self.corr_id = str(uuid.uuid4())
            # print(self.corr_id)
            self.channel.basic_publish(exchange='',
                                       routing_key=queue_name,
                                       properties=pika.BasicProperties(
                                           reply_to=self.callback_queue,  # 发送返回信息的队列name
                                           correlation_id=self.corr_id,  # 发送uuid 相当于验证码
                                       ),
                                       body=command)
    
            return self.callback_queue,self.corr_id
    client.py

    运行示例图

  • 相关阅读:
    软件工程第九周总结
    作为使用者对qq拼音输入法和搜狗输入法的评价
    关于编写Windows程序中启动兼容性问题
    软件工程第八周总结
    Java实验--关于课上找“水王”问题分析
    大道至简阅读笔记03
    家庭记账本-----一
    《人月神话》读后感----一到三章
    Java实现数据库与eclipse的连接
    流和文件
  • 原文地址:https://www.cnblogs.com/lianzhilei/p/5983673.html
Copyright © 2011-2022 走看看