RabbitMQ RPC模型
RPC(remote procedure call)模型说通俗一点就是客户端发一个请求给远程服务端,让它去执行,然后服务端端再把执行的结果再返回给客户端。

1、服务端
import pika
#创建socket实例,声明管道,声明queue
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
channel.queue_declare(queue="rpc_queue")
def fib(n):
"""
斐波那契数列
:param n:
:return:
"""
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body): # props 是客户端发过来的消息
n = int(body)
print("fib(%s)" % n)
response = fib(n)
# 发布消息
ch.basic_publish(exchange="",
routing_key=props.reply_to, # props.reply_to从客户端取出双方约定好存放返回结果的queue
properties=pika.BasicProperties # 定义一些基本属性
(correlation_id=props.correlation_id), # props.correlation_id 从客户端取出当前请求的ID返回给客户端做验证
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag) # 手动确认消息被消费
channel.basic_qos(prefetch_count=1) # 每次最多处理一个客户端发过来的消息
# 消费消息
channel.basic_consume(on_request, # 回调函数
queue="rpc_queue")
print("Awaiting RPC requests")
channel.start_consuming()
2、客户端
import pika
import uuid
import time
class FibonacciRpcClient(object):
"""
斐波那契数列rpc客户端
"""
def __init__(self):
"""
定义好创建socket实例、声明管道、声明随机产生的唯一queue、消费信息的静态变量
"""
self.connection = pika.BlockingConnection(pika.ConnectionParameters
(host="localhost"))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
print("---->", method, props)
# 当服务端返回的id跟当初请求的id一致时,再去读取服务端发送的信息保持数据的一致性
if self.corr_id == props.correlation_id: # 当服务端返回的id跟当初请求的id一致时,保持数据的一致性
self.response = body
def call(self,n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.publish(exchange="",
routing_key="rpc_queue", # 双方的request所用的queue
properties=pika.BasicProperties( # 定义基本属性
reply_to=self.callback_queue, # 定义客户端服务端双方response的所用的Q
correlation_id=self.corr_id), # 定义这次request的唯一ID
body=str(n))
while self.response is None:
self.connection.process_data_events() # 非 阻塞版的start_consumer()
print("no msg....")
time.sleep(0.5)
return int(self.response)
if __name__ == "__main__":
fibonacci_rpc = FibonacciRpcClient()
print("Requesting fib(8)")
response = fibonacci_rpc.call(8)
print("Got %r" % response)
3、输出
服务端:
Awaiting RPC requests fib(8)
客户端:
Requesting fib(8) no msg.... ----> <Basic.Deliver(['consumer_tag=ctag1.cf2e7983c7d840db8c68f4571472c18d', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=amq.gen-ezXgs0tRO5SldZeRH97VPw'])> <BasicProperties(['correlation_id=601e860d-c93d-4c94-959a-3a39be177f7c'])> no msg....
Got 21