zoukankan      html  css  js  c++  java
  • 【python】-- RabbitMQ RPC模型

    RabbitMQ RPC模型

    RPC(remote procedure call)模型说通俗一点就是客户端发一个请求给远程服务端,让它去执行,然后服务端端再把执行的结果再返回给客户端。

    1、服务端

    import pika
    
    #创建socket实例,声明管道,声明queue
    connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
    channel = connection.channel()
    channel.queue_declare(queue="rpc_queue")
    
    
    def fib(n):
        """
        斐波那契数列
        :param n:
        :return:
        """
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n - 1) + fib(n - 2)
    
    
    def on_request(ch, method, props, body):  # props 是客户端发过来的消息
        n = int(body)
        print("fib(%s)" % n)
        response = fib(n)
        # 发布消息
        ch.basic_publish(exchange="",
                         routing_key=props.reply_to,  # props.reply_to从客户端取出双方约定好存放返回结果的queue
                         properties=pika.BasicProperties  # 定义一些基本属性
                         (correlation_id=props.correlation_id),  # props.correlation_id 从客户端取出当前请求的ID返回给客户端做验证
                         body=str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动确认消息被消费
    
    channel.basic_qos(prefetch_count=1)  # 每次最多处理一个客户端发过来的消息
    # 消费消息
    channel.basic_consume(on_request,  # 回调函数
                          queue="rpc_queue")
    
    print("Awaiting RPC requests")
    channel.start_consuming()
    

      

    2、客户端

    import pika
    import uuid
    import time
    
    
    class FibonacciRpcClient(object):
        """
        斐波那契数列rpc客户端
        """
    
        def __init__(self):
            """
            定义好创建socket实例、声明管道、声明随机产生的唯一queue、消费信息的静态变量
            """
            self.connection = pika.BlockingConnection(pika.ConnectionParameters
                                                      (host="localhost"))
            self.channel = self.connection.channel()
            result = self.channel.queue_declare(exclusive=True)
            self.callback_queue = result.method.queue
            self.channel.basic_consume(self.on_response, no_ack=True,
                                       queue=self.callback_queue)
    
        def on_response(self, ch, method, props, body):
            print("---->", method, props)
            # 当服务端返回的id跟当初请求的id一致时,再去读取服务端发送的信息保持数据的一致性
            if self.corr_id == props.correlation_id:  # 当服务端返回的id跟当初请求的id一致时,保持数据的一致性
               self.response = body
    
    
        def call(self,n):
            self.response = None
            self.corr_id = str(uuid.uuid4())
            self.channel.publish(exchange="",
                                 routing_key="rpc_queue",  # 双方的request所用的queue
                                 properties=pika.BasicProperties(  # 定义基本属性
                                     reply_to=self.callback_queue,  # 定义客户端服务端双方response的所用的Q
                                     correlation_id=self.corr_id),  # 定义这次request的唯一ID
                                 body=str(n))
            while self.response is None:
                self.connection.process_data_events()  # 非 阻塞版的start_consumer()
                print("no msg....")
                time.sleep(0.5)
            return int(self.response)
    
    if __name__ == "__main__":
        fibonacci_rpc = FibonacciRpcClient()
        print("Requesting fib(8)")
        response = fibonacci_rpc.call(8)
        print("Got %r" % response)
    

    3、输出

    服务端:

     Awaiting RPC requests
     fib(8)
    

      

    客户端:

    Requesting fib(8)
    no msg....
    ----> <Basic.Deliver(['consumer_tag=ctag1.cf2e7983c7d840db8c68f4571472c18d', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=amq.gen-ezXgs0tRO5SldZeRH97VPw'])> <BasicProperties(['correlation_id=601e860d-c93d-4c94-959a-3a39be177f7c'])>
    no msg....
    Got 21

      

  • 相关阅读:
    详述@Responsebody和HTTP异步请求的关系
    利用synchronized解析死锁的一种形成方式
    初识Spring JdbcTemplate
    初识SpringIOC
    JasperReport框架使用教程(附带常见空白页问题说明)
    LeetCode~1033.移动石子直到连续
    LeetCode~941.有效的山脉数组
    LeetCode~344. 反转字符串
    Job for network.service failed because the control process exited with error code问题
    LeetCode~报数(简单)
  • 原文地址:https://www.cnblogs.com/Keep-Ambition/p/8054285.html
Copyright © 2011-2022 走看看