zoukankan      html  css  js  c++  java
  • python中RabbitMQ的使用(远程过程调用RPC)

    在RabbitMQ消息队列中,往往接收者、发送者不止是一个身份。例如接接收者收到消息并且需要返回给发送者。

    此时接收者、发送者的身份不再固定!

    我们来模拟该情形:

    假设有客户端client,服务端server。

    我们需要从客户端发送数据,通过服务端的计算后再返回给客户端。

    client.py

     1 #!/usr/bin/env python
     2 # -*- coding: utf-8 -*-
     3 import pika
     4 import uuid
     5 
     6 class Client(object):
     7     def __init__(self):
     8         hostname = '192.168.1.133'
     9         parameters = pika.ConnectionParameters(hostname)
    10         self.connection = pika.BlockingConnection(parameters)
    11 
    12         self.channel = self.connection.channel()
    13         # 定义接收返回消息的队列
    14         result = self.channel.queue_declare(exclusive=True)
    15         self.callback_queue = result.method.queue
    16 
    17         self.channel.basic_consume(self.on_response,
    18                                    no_ack=True,
    19                                    queue=self.callback_queue)
    20         self.response = None
    21         self.corr_id = ''
    22 
    23     # 定义接收到返回消息的处理方法
    24     def on_response(self, ch, method, props, body):
    25         if self.corr_id == props.correlation_id:
    26             self.response = body
    27 
    28     def request(self, n):
    29         # self.response = None
    30         self.corr_id = str(uuid.uuid4())
    31         # 发送计算请求,并声明返回队列
    32         self.channel.basic_publish(exchange='',
    33                                    routing_key='count_queue',
    34                                    properties=pika.BasicProperties(
    35                                        reply_to=self.callback_queue,
    36                                        correlation_id=self.corr_id,
    37                                    ),
    38                                    body=str(n))
    39         # 接收返回的数据
    40         while self.response is None:
    41             self.connection.process_data_events()
    42         return int(self.response)
    43 
    44 client = Client()
    45 
    46 print " [*] Requesting fib(30)"
    47 response = client.request(30)
    48 print " [*] Got fib(30)= %r" % response

    server.py

     1 #!/usr/bin/env python
     2 # -*- coding: utf-8 -*-
     3 import pika
     4 
     5 hostname = '192.168.1.133'
     6 parameters = pika.ConnectionParameters(hostname)
     7 connection = pika.BlockingConnection(parameters)
     8 
     9 # 创建通道
    10 channel = connection.channel()
    11 # 声明队列
    12 channel.queue_declare(queue='count_queue')
    13 print ' [*] Waiting for n'
    14 
    15 # 算法
    16 def fib(n):
    17     if n == 0:
    18         return 0
    19     elif n == 1:
    20         return 1
    21     else:
    22         return fib(n - 1) + fib(n - 2)
    23 
    24 # 设置回调函数
    25 def on_request(ch, method, props, body):
    26     n = int(body)
    27     response = fib(n)
    28     print " [*] fib(%s)" % n, '=', response
    29     # 将结果反馈
    30     ch.basic_publish(exchange='',
    31                      routing_key=props.reply_to,
    32                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
    33                      body=str(response))
    34     ch.basic_ack(delivery_tag=method.delivery_tag)
    35 
    36 #公平调度
    37 channel.basic_qos(prefetch_count=1)
    38 channel.basic_consume(on_request, queue='count_queue')
    39 print " [*] Awaiting count_queue requests"
    40 channel.start_consuming()

    结果如下:

    服务端:                                                                         客户端:

                                     

  • 相关阅读:
    利用Python进行数据分析笔记-时间序列(时区、周期、频率)
    形象易懂讲解算法I——小波变换
    小波变换与傅里叶变换的区别
    Thinkpad E550 开启 Legacy Only
    Thinkpad E550 开启 虚拟化
    常见音频接口
    IAR embedded Workbench for ARM 8.32.1 安装包
    stm32f767 无操作系统 LwIP 移植 (一)
    stm32f767 无操作系统 LwIP 移植 (二)
    北京市电力公司
  • 原文地址:https://www.cnblogs.com/jfl-xx/p/7346464.html
Copyright © 2011-2022 走看看