在RabbitMQ消息队列中,往往接收者、发送者不止是一个身份。例如接接收者收到消息并且需要返回给发送者。
此时接收者、发送者的身份不再固定!
我们来模拟该情形:
假设有客户端client,服务端server。
我们需要从客户端发送数据,通过服务端的计算后再返回给客户端。
client.py
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 import pika 4 import uuid 5 6 class Client(object): 7 def __init__(self): 8 hostname = '192.168.1.133' 9 parameters = pika.ConnectionParameters(hostname) 10 self.connection = pika.BlockingConnection(parameters) 11 12 self.channel = self.connection.channel() 13 # 定义接收返回消息的队列 14 result = self.channel.queue_declare(exclusive=True) 15 self.callback_queue = result.method.queue 16 17 self.channel.basic_consume(self.on_response, 18 no_ack=True, 19 queue=self.callback_queue) 20 self.response = None 21 self.corr_id = '' 22 23 # 定义接收到返回消息的处理方法 24 def on_response(self, ch, method, props, body): 25 if self.corr_id == props.correlation_id: 26 self.response = body 27 28 def request(self, n): 29 # self.response = None 30 self.corr_id = str(uuid.uuid4()) 31 # 发送计算请求,并声明返回队列 32 self.channel.basic_publish(exchange='', 33 routing_key='count_queue', 34 properties=pika.BasicProperties( 35 reply_to=self.callback_queue, 36 correlation_id=self.corr_id, 37 ), 38 body=str(n)) 39 # 接收返回的数据 40 while self.response is None: 41 self.connection.process_data_events() 42 return int(self.response) 43 44 client = Client() 45 46 print " [*] Requesting fib(30)" 47 response = client.request(30) 48 print " [*] Got fib(30)= %r" % response
server.py
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 import pika 4 5 hostname = '192.168.1.133' 6 parameters = pika.ConnectionParameters(hostname) 7 connection = pika.BlockingConnection(parameters) 8 9 # 创建通道 10 channel = connection.channel() 11 # 声明队列 12 channel.queue_declare(queue='count_queue') 13 print ' [*] Waiting for n' 14 15 # 算法 16 def fib(n): 17 if n == 0: 18 return 0 19 elif n == 1: 20 return 1 21 else: 22 return fib(n - 1) + fib(n - 2) 23 24 # 设置回调函数 25 def on_request(ch, method, props, body): 26 n = int(body) 27 response = fib(n) 28 print " [*] fib(%s)" % n, '=', response 29 # 将结果反馈 30 ch.basic_publish(exchange='', 31 routing_key=props.reply_to, 32 properties=pika.BasicProperties(correlation_id=props.correlation_id), 33 body=str(response)) 34 ch.basic_ack(delivery_tag=method.delivery_tag) 35 36 #公平调度 37 channel.basic_qos(prefetch_count=1) 38 channel.basic_consume(on_request, queue='count_queue') 39 print " [*] Awaiting count_queue requests" 40 channel.start_consuming()
结果如下:
服务端: 客户端: