要求:
文件分布:
流程图:
import pika import os import socket class Server(object): def __init__(self, queuename): self.queuename = queuename self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost')) self.channel = self.connection.channel() #声明一个管道 self.channel.queue_declare(queue=self.queuename) def handle(self,command): message = os.popen(command.decode()).read() if not message: message = 'wrong command' return message def on_requet(self, ch, method,props,body): response = self.handle(body) print(response) ch.basic_publish(exchange='', routing_key=props.reply_to, #拿到客户端随机生成的queue properties = pika.BasicProperties(correlation_id = props.correlation_id), body = str(response)) ch.basic_ack(delivery_tag = method.delivery_tag)#确保任务完成 def start(self): self.channel.basic_consume(self.on_requet, queue=self.queuename) #收到消息就调用on_requet print(" [x] Awaiting RPC requests") self.channel.start_consuming() if __name__ == "__main__": hostname = socket.gethostname() ip = socket.gethostbyname(hostname) # 获取本地ip地址作为queue name print(ip) queue_name = ip server = Server(queue_name) server.start()
1 import pika 2 import uuid 3 import random 4 import threading 5 6 7 class Client(object): 8 def __init__(self): 9 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost')) 10 self.channel = self.connection.channel() 11 12 def on_response(self, ch, method, props, body): 13 if self.callback_id == props.correlation_id: 14 self.response = body 15 ch.basic_ack(delivery_tag=method.delivery_tag) 16 17 def get_response(self, callback_queue, corr_id): 18 self.response = None 19 self.callback_id = corr_id 20 self.channel.basic_consume(self.on_response, queue=callback_queue) 21 while self.response is None: 22 self.connection.process_data_events() # 非阻塞版的start_consuming 23 return self.response 24 25 def call(self, queuename, n): 26 # 声明临时的回调队列 27 result = self.channel.queue_declare(exclusive=False) 28 self.callback_queue = result.method.queue 29 self.corr_id = str(uuid.uuid4()) 30 self.channel.basic_publish(exchange='', 31 routing_key=queuename, 32 properties=pika.BasicProperties( 33 reply_to=self.callback_queue, 34 correlation_id = self.corr_id, 35 ), 36 body = n) 37 return self.callback_queue, self.corr_id 38 39 class Threading(object): 40 def __init__(self): 41 self.info={} 42 43 def check_all(self, cmd): 44 ''' 45 查看已经有的任务id 46 :param cmd: 47 :return: 48 ''' 49 for i in self.info: 50 print('task id: %s, host: %s, command:%s' % (i, self.info[i][0], self.info[i][1])) 51 52 def check_task(self, cmd_id): 53 ''' 54 查看运行结果 55 :param cmd_id: 56 :return: 57 ''' 58 try: 59 id = int(cmd_id.split()[1]) 60 callack_queue = self.info[id][2] 61 callack_id=self.info[id][3] 62 client = Client() 63 res = client.get_response(callack_queue, callack_id) 64 print(res.decode()) 65 del self.info[id] 66 except Exception as e: 67 print(e) 68 69 def run(self, cmd): 70 comm = cmd.split(""")[1] 71 hosts = cmd.split("--") 72 host = hosts[1].split()[1:] #拿ip地址 73 for i in host: 74 id = random.randint(10000,99999) 75 obj = Client() 76 res = obj.call(i, comm) 77 self.info[id] = [i,comm,res[0], res[1]] 78 return self.info 79 80 81 def ref(self, cmd): 82 ''' 83 反射 84 :param cmd: 85 :return: 86 ''' 87 str = cmd.split()[0] 88 if hasattr(self,str): 89 func = getattr(self,str) 90 r = func(cmd) 91 if r is not None: 92 for i in r: 93 print('task id: %s, host: %s, command:%s' % (i, r[i][0], r[i][1])) 94 95 def thread(self): 96 while True: 97 cmd = input("->>").strip() 98 if not cmd:continue 99 t1 = threading.Thread(target=self.ref, args=(cmd, )) 100 t1.start() 101 102 103 104 105 obj = Threading() 106 res = obj.thread()