zoukankan      html  css  js  c++  java
  • 基于RabbitMQ rpc实现的主机管理

    要求:

    文件分布:

    流程图:

    import pika
    import os
    import socket
    
    class Server(object):
        def __init__(self, queuename):
            self.queuename = queuename
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost'))
            self.channel = self.connection.channel()  #声明一个管道
            self.channel.queue_declare(queue=self.queuename)
        def handle(self,command):
            message = os.popen(command.decode()).read()
            if not message:
                message = 'wrong command'
            return message
        def on_requet(self, ch, method,props,body):
            response = self.handle(body)
            print(response)
            ch.basic_publish(exchange='',
                          routing_key=props.reply_to, #拿到客户端随机生成的queue
                          properties = pika.BasicProperties(correlation_id = props.correlation_id),
                          body = str(response))
            ch.basic_ack(delivery_tag = method.delivery_tag)#确保任务完成
    
        def start(self):
            self.channel.basic_consume(self.on_requet, queue=self.queuename) #收到消息就调用on_requet
            print(" [x] Awaiting RPC requests")
            self.channel.start_consuming()
    
    
    
    
    
    if __name__ == "__main__":
        hostname = socket.gethostname()
        ip = socket.gethostbyname(hostname) # 获取本地ip地址作为queue name
        print(ip)
        queue_name = ip
        server = Server(queue_name)
        server.start()
    server
      1 import pika
      2 import uuid
      3 import random
      4 import threading
      5 
      6 
      7 class Client(object):
      8     def __init__(self):
      9         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost'))
     10         self.channel = self.connection.channel()
     11 
     12     def on_response(self, ch, method, props, body):
     13         if self.callback_id == props.correlation_id:
     14             self.response = body
     15         ch.basic_ack(delivery_tag=method.delivery_tag)
     16 
     17     def get_response(self, callback_queue, corr_id):
     18         self.response = None
     19         self.callback_id = corr_id
     20         self.channel.basic_consume(self.on_response, queue=callback_queue)
     21         while self.response is None:
     22             self.connection.process_data_events()  # 非阻塞版的start_consuming
     23         return self.response
     24 
     25     def call(self, queuename, n):
     26         # 声明临时的回调队列
     27         result = self.channel.queue_declare(exclusive=False)
     28         self.callback_queue = result.method.queue
     29         self.corr_id = str(uuid.uuid4())
     30         self.channel.basic_publish(exchange='',
     31                                    routing_key=queuename,
     32                                    properties=pika.BasicProperties(
     33                                        reply_to=self.callback_queue,
     34                                        correlation_id = self.corr_id,
     35                                    ),
     36                                    body = n)
     37         return self.callback_queue, self.corr_id
     38 
     39 class Threading(object):
     40     def __init__(self):
     41         self.info={}
     42 
     43     def check_all(self, cmd):
     44         '''
     45         查看已经有的任务id
     46         :param cmd:
     47         :return:
     48         '''
     49         for i in self.info:
     50             print('task id: %s, host: %s, command:%s' % (i, self.info[i][0], self.info[i][1]))
     51 
     52     def check_task(self, cmd_id):
     53         '''
     54         查看运行结果
     55         :param cmd_id:
     56         :return:
     57         '''
     58         try:
     59             id = int(cmd_id.split()[1])
     60             callack_queue = self.info[id][2]
     61             callack_id=self.info[id][3]
     62             client = Client()
     63             res = client.get_response(callack_queue, callack_id)
     64             print(res.decode())
     65             del self.info[id]
     66         except Exception as e:
     67             print(e)
     68 
     69     def run(self, cmd):
     70         comm = cmd.split(""")[1]
     71         hosts = cmd.split("--")
     72         host = hosts[1].split()[1:] #拿ip地址
     73         for i in host:
     74             id = random.randint(10000,99999)
     75             obj = Client()
     76             res = obj.call(i, comm)
     77             self.info[id] = [i,comm,res[0], res[1]]
     78         return self.info
     79 
     80 
     81     def ref(self, cmd):
     82         '''
     83         反射
     84         :param cmd:
     85         :return:
     86         '''
     87         str = cmd.split()[0]
     88         if hasattr(self,str):
     89             func = getattr(self,str)
     90             r = func(cmd)
     91             if r is not None:
     92                 for i in r:
     93                     print('task id: %s, host: %s, command:%s' % (i, r[i][0], r[i][1]))
     94 
     95     def thread(self):
     96         while True:
     97             cmd = input("->>").strip()
     98             if not cmd:continue
     99             t1 = threading.Thread(target=self.ref, args=(cmd, ))
    100             t1.start()
    101 
    102 
    103 
    104 
    105 obj = Threading()
    106 res = obj.thread()
    cliernt
  • 相关阅读:
    jekyll+livereload+chrome插件-更新文件后自动刷新
    boostraps+jekyll+sass/scss+less+grunt整合使用详细备忘
    Centos使用keepalived配置MySQL双主热备集群
    MySQL数据库的集群方案
    Nginx使用Lua脚本加解密RSA字符串
    Nginx使用Lua脚本连接Redis验证身份并下载文件
    Centos安装ELK
    Centos7中搭建Redis6集群操作步骤
    only-office以Docker方式安装使用
    Kafka笔记
  • 原文地址:https://www.cnblogs.com/nikitapp/p/6836949.html
Copyright © 2011-2022 走看看