RabbitMQ RPC模型
RPC(remote procedure call)模型说通俗一点就是客户端发一个请求给远程服务端,让它去执行,然后服务端端再把执行的结果再返回给客户端。
1、服务端
import pika #创建socket实例,声明管道,声明queue connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) channel = connection.channel() channel.queue_declare(queue="rpc_queue") def fib(n): """ 斐波那契数列 :param n: :return: """ if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): # props 是客户端发过来的消息 n = int(body) print("fib(%s)" % n) response = fib(n) # 发布消息 ch.basic_publish(exchange="", routing_key=props.reply_to, # props.reply_to从客户端取出双方约定好存放返回结果的queue properties=pika.BasicProperties # 定义一些基本属性 (correlation_id=props.correlation_id), # props.correlation_id 从客户端取出当前请求的ID返回给客户端做验证 body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) # 手动确认消息被消费 channel.basic_qos(prefetch_count=1) # 每次最多处理一个客户端发过来的消息 # 消费消息 channel.basic_consume(on_request, # 回调函数 queue="rpc_queue") print("Awaiting RPC requests") channel.start_consuming()
2、客户端
import pika import uuid import time class FibonacciRpcClient(object): """ 斐波那契数列rpc客户端 """ def __init__(self): """ 定义好创建socket实例、声明管道、声明随机产生的唯一queue、消费信息的静态变量 """ self.connection = pika.BlockingConnection(pika.ConnectionParameters (host="localhost")) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): print("---->", method, props) # 当服务端返回的id跟当初请求的id一致时,再去读取服务端发送的信息保持数据的一致性 if self.corr_id == props.correlation_id: # 当服务端返回的id跟当初请求的id一致时,保持数据的一致性 self.response = body def call(self,n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.publish(exchange="", routing_key="rpc_queue", # 双方的request所用的queue properties=pika.BasicProperties( # 定义基本属性 reply_to=self.callback_queue, # 定义客户端服务端双方response的所用的Q correlation_id=self.corr_id), # 定义这次request的唯一ID body=str(n)) while self.response is None: self.connection.process_data_events() # 非 阻塞版的start_consumer() print("no msg....") time.sleep(0.5) return int(self.response) if __name__ == "__main__": fibonacci_rpc = FibonacciRpcClient() print("Requesting fib(8)") response = fibonacci_rpc.call(8) print("Got %r" % response)
3、输出
服务端:
Awaiting RPC requests fib(8)
客户端:
Requesting fib(8) no msg.... ----> <Basic.Deliver(['consumer_tag=ctag1.cf2e7983c7d840db8c68f4571472c18d', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=amq.gen-ezXgs0tRO5SldZeRH97VPw'])> <BasicProperties(['correlation_id=601e860d-c93d-4c94-959a-3a39be177f7c'])> no msg....
Got 21