@version:
@author: morgana
@license: Apache Licence
@contact: vipmorgana@gmail.com
@site:
@software: PyCharm
@file: rpcclient.py
@time: 2018/4/15 上午12:28
"""
# 1.声明一个队列,作为reply_to返回消息结果的队列
# 2.发消息到队列,消息里带一个唯一标识符uid,reply_to
# 3.监听reply_to 的队列,直到有结果
import queue
import pika
import uuid
class CMDRpcClient(object):
def __init__(self):
credentials = pika.PlainCredentials('morgana', '123456')
parameters = pika.ConnectionParameters(host='127.0.0.1', credentials=credentials)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
#声明一个队列,作为reply_to返回消息结果的队列
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue #命令的执行结果的queue
#声明要监听callback_queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
"""
收到服务器端命令结果后执行这个函数
:param ch:
:param method:
:param props:
:param body:
:return:
"""
if self.corr_id == props.correlation_id:#服务端props.correlation_id
self.response = body.decode("gbk") #把执行结果赋值给Response
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4()) #唯一标识符号 自己生成的
self.channel.basic_publish(exchange='',
routing_key='rpc_queue2',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
#检测监听的队列里有没有新消息,如果有,收,如果没有,返回None
#检测有没有要发送的新指令
return self.response
cmd_rpc = CMDRpcClient()
print(" [x] Requesting fib(30)")
response = cmd_rpc.call('ifconfig')
print(response)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@version:
@author: morgana
@license: Apache Licence
@contact: vipmorgana@gmail.com
@site:
@software: PyCharm
@file: rpcserver.py
@time: 2018/4/15 上午12:29
"""
#1.定义fib函数
#2. 声明接收指令的队列名rpc_queue
#3. 开始监听队列,收到消息后 调用fib函数
#4 把fib执行结果,发送回客户端指定的reply_to 队列
import subprocess
import pika
credentials = pika.PlainCredentials('morgana', '123456')
parameters = pika.ConnectionParameters(host='127.0.0.1', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() #队列连接通道
channel.queue_declare(queue='rpc_queue2')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def run_cmd(cmd):
cmd_obj = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
result = cmd_obj.stdout.read() + cmd_obj.stderr.read()
return result
def on_request(ch, method, props, body):
cmd = body.decode("utf-8")
print(" [.] run (%s)" % cmd)
response = run_cmd(cmd)
ch.basic_publish(exchange='',
routing_key=props.reply_to, #队列
properties=pika.BasicProperties(correlation_id =
props.correlation_id),
body=response)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(on_request, queue='rpc_queue2')
print(" [x] Awaiting RPC requests")
channel.start_consuming()