zoukankan      html  css  js  c++  java
  • 〖Demo〗-- 基于RabbitMQ rpc实现的主机管理

    【基于RabbitMQ rpc实现的主机管理】

    要求:

    文件分布:

    流程图:

    import pika
    import os
    import socket
    
    class Server(object):
        def __init__(self, queuename):
            self.queuename = queuename
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost'))
            self.channel = self.connection.channel()  #声明一个管道
            self.channel.queue_declare(queue=self.queuename)
        def handle(self,command):
            message = os.popen(command.decode()).read()
            if not message:
                message = 'wrong command'
            return message
        def on_requet(self, ch, method,props,body):
            response = self.handle(body)
            print(response)
            ch.basic_publish(exchange='',
                          routing_key=props.reply_to, #拿到客户端随机生成的queue
                          properties = pika.BasicProperties(correlation_id = props.correlation_id),
                          body = str(response))
            ch.basic_ack(delivery_tag = method.delivery_tag)#确保任务完成
    
        def start(self):
            self.channel.basic_consume(self.on_requet, queue=self.queuename) #收到消息就调用on_requet
            print(" [x] Awaiting RPC requests")
            self.channel.start_consuming()
    
    
    
    
    
    if __name__ == "__main__":
        hostname = socket.gethostname()
        ip = socket.gethostbyname(hostname) # 获取本地ip地址作为queue name
        print(ip)
        queue_name = ip
        server = Server(queue_name)
        server.start()
    server
    import pika
    import uuid
    import random
    import threading
    
    
    class Client(object):
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost'))
            self.channel = self.connection.channel()
    
        def on_response(self, ch, method, props, body):
            if self.callback_id == props.correlation_id:
                self.response = body
            ch.basic_ack(delivery_tag=method.delivery_tag)
    
        def get_response(self, callback_queue, corr_id):
            self.response = None
            self.callback_id = corr_id
            self.channel.basic_consume(self.on_response, queue=callback_queue)
            while self.response is None:
                self.connection.process_data_events()  # 非阻塞版的start_consuming
            return self.response
    
        def call(self, queuename, n):
            # 声明临时的回调队列
            result = self.channel.queue_declare(exclusive=False)
            self.callback_queue = result.method.queue
            self.corr_id = str(uuid.uuid4())
            self.channel.basic_publish(exchange='',
                                       routing_key=queuename,
                                       properties=pika.BasicProperties(
                                           reply_to=self.callback_queue,
                                           correlation_id = self.corr_id,
                                       ),
                                       body = n)
            return self.callback_queue, self.corr_id
    
    class Threading(object):
        def __init__(self):
            self.info={}
    
        def check_all(self, cmd):
            '''
            查看已经有的任务id
            :param cmd:
            :return:
            '''
            for i in self.info:
                print('task id: %s, host: %s, command:%s' % (i, self.info[i][0], self.info[i][1]))
    
        def check_task(self, cmd_id):
            '''
            查看运行结果
            :param cmd_id:
            :return:
            '''
            try:
                id = int(cmd_id.split()[1])
                callack_queue = self.info[id][2]
                callack_id=self.info[id][3]
                client = Client()
                res = client.get_response(callack_queue, callack_id)
                print(res.decode())
                del self.info[id]
            except Exception as e:
                print(e)
    
        def run(self, cmd):
            comm = cmd.split(""")[1]
            hosts = cmd.split("--")
            host = hosts[1].split()[1:] #拿ip地址
            for i in host:
                id = random.randint(10000,99999)
                obj = Client()
                res = obj.call(i, comm)
                self.info[id] = [i,comm,res[0], res[1]]
            return self.info
    
    
        def ref(self, cmd):
            '''
            反射
            :param cmd:
            :return:
            '''
            str = cmd.split()[0]
            if hasattr(self,str):
                func = getattr(self,str)
                r = func(cmd)
                if r is not None:
                    for i in r:
                        print('task id: %s, host: %s, command:%s' % (i, r[i][0], r[i][1]))
    
        def thread(self):
            while True:
                cmd = input("->>").strip()
                if not cmd:continue
                t1 = threading.Thread(target=self.ref, args=(cmd, ))
                t1.start()
    
    
    
    
    obj = Threading()
    res = obj.thread()
    cliernt
  • 相关阅读:
    mysql数据库基本类型
    常用辅助类【转】
    Java 并发笔记】并发机制底层实现整理[转发]
    关于PROPAGATION_NESTED的理解
    线程数设置
    c# Expression 扩展[转]
    Net定时器 【转载】
    【转】高可用设计-58沈剑
    【转】委托的三种调用示例(同步调用 异步调用 异步回调)
    [coursera OA] acme match
  • 原文地址:https://www.cnblogs.com/SHENGXIN/p/8097860.html
Copyright © 2011-2022 走看看