zoukankan      html  css  js  c++  java
  • RabbitMQ中RPC的实现及其通信机制

    RabbitMQ中RPC的实现:客户端发送请求消息,服务端回复响应消息,为了接受响应response,客户端需要发送一个回调队列的地址来接受响应,每条消息在发送的时候会带上一个唯一的correlation_id,相应的服务端处理计算后会将结果返回到对应的correlation_id。

    RPC调用流程:

    当生产者启动时,它会创建一个匿名的独占回调队列,对于一个RPC请求,生产者发送一条具有两个属性的消息:reply_to(回调队列),correlation_id(每个请求的唯一值),请求被发送到rpc_queue队列,消费者等待该队列上的请求。当一个请求出现时,它会执行该任务,将带有结果的消息发送回生产者。生产者等待回调队列上的数据,当消息出现时,它检查相关ID属性,如果它与请求中的值匹配,则返回对应用程序的响应。

     RabbitMQ斐波拉契计算的RPC,消费者实现:

    """
    基于RabbitMQ实现RPC通信机制 --> 服务端
    """
    
    import pika
    import uuid
    from functools import lru_cache
    
    
    class RabbitServer(object):
        def __init__(self):
            self.conn = pika.BlockingConnection(
                pika.ConnectionParameters(host='localhost', port=5672)
            )
            self.channel = self.conn.channel()
    
            # 声明一个队列,并进行持久化,exclusive设置为false
            self.channel.queue_declare(
                exclusive=False, durable=True, queue='task_queue'
            )
    
            # 声明一个exhange交换机,类型为topic
            self.channel.exchange_declare(
                exchange='logs_rpc', exchange_type='topic', durable=True
            )
    
            # 将队列与交换机进行绑定
            routing_keys = ['#']  # 接受所有的消息
            for routing_key in routing_keys:
                self.channel.queue_bind(
                    exchange='logs_rpc', queue='task_queue', routing_key=routing_key
                )
    
        @lru_cache()
        def fib(self, n):
            """
            斐波那契数列.===>程序的处理逻辑
            使用lru_cache 优化递归
            :param n:
            :return:
            """
            if n == 0:
                return 0
            elif n == 1:
                return 1
            else:
                return self.fib(n - 1) + self.fib(n - 2)
    
        def call_back(self, channel, method, properties, body):
            print('------------------------------------------')
            print('接收到的消息为(斐波那契数列的入参项为):{}'.format(str(body)))
            print('消息的相关属性为:')
            print(properties)
            value = self.fib(int(body))
            print('斐波那契数列的运行结果为:{}'.format(str(value)))
    
            # 交换机将消息发送到队列
            self.channel.basic_publish(
                exchange='',
                routing_key=properties.reply_to,
                body=str(value),
                properties=pika.BasicProperties(
                    delivery_mode=2,
                    correlation_id=properties.correlation_id,
                ))
    
            # 消费者对消息进行确认
            self.channel.basic_ack(delivery_tag=method.delivery_tag)
    
        def receive_msg(self):
            print('开始接受消息...')
            self.channel.basic_qos(prefetch_count=1)
            self.channel.basic_consume(
                consumer_callback=self.call_back,
                queue='task_queue',
                no_ack=False,  # 消费者对消息进行确认
                consumer_tag=str(uuid.uuid4())
            )
    
        def consume(self):
            self.receive_msg()
            self.channel.start_consuming()
    
    
    if __name__ == '__main__':
        rabbit_consumer = RabbitServer()
        rabbit_consumer.consume()

     生产者实现:

    """
    基于RabbitMQ实现RPC通信机制 --> 客户端
    """
    
    import pika
    import uuid
    import time
    
    
    class RabbitClient(object):
        def __init__(self):
            # 与RabbitMq服务器建立连接
            self.conn = pika.BlockingConnection(
                pika.ConnectionParameters(host='localhost', port=5672)
            )
            self.channel = self.conn.channel()
    
            # 声明一个exchange交换机,交换机的类型为topic
            self.channel.exchange_declare(
                exchange='logs_rpc', exchange_type='topic', durable=True
            )
    
            # 声明一个回调队列,用于接受RPC回调结果的运行结果
            result = self.channel.queue_declare(durable=True, exclusive=False)
            self.call_queue = result.method.queue
    
            # 从回调队列当中获取运行结果.
            self.channel.basic_consume(
                consumer_callback=self.on_response,
                queue=self.call_queue,
                no_ack=False
            )
    
        def on_response(self, channel, method, properties, body):
            """
            对收到的消息进行确认
            找到correlation_id与服务端的消息标识匹配的消息结果
            :param channel:
            :param method:
            :param properties:
            :param body:
            :return:
            """
            if self.corr_id == properties.correlation_id:
                self.response = body
                print('斐波那契数列的RPC返回结果是:{}'.format(body))
                print('相关属性信息:')
                print(properties)
            self.channel.basic_ack(delivery_tag=method.delivery_tag)
    
        def send_msg(self, routing_key, message):
            """
            exchange交换机将根据消息的路由键将消息路由到对应的queue当中
            :param routing_key: 消息的路由键
            :param message: 生成者发送的消息
            :return:
            """
            self.response = None
            self.corr_id = str(uuid.uuid4())
            self.channel.basic_publish(
                exchange='logs_rpc',
                routing_key=routing_key,
                body=message,
                properties=pika.BasicProperties(
                    delivery_mode=2,
                    correlation_id=self.corr_id,
                    reply_to=self.call_queue,
                ))
    
            while self.response is None:
                print('等待远程服务端的返回结果...')
                self.conn.process_data_events()  # 非阻塞式的不断获取消息.
    
            return self.response
    
        def close(self):
            self.conn.close()
    
    
    if __name__ == "__main__":
        rabbit_producer = RabbitClient()
        routing_key = 'hello every one'
        start_time = int(time.time())
        for item in range(2000):
            num = str(item)
            print('生产者发送的消息为:{}'.format(num))
            rabbit_producer.send_msg(routing_key, num)
        end_time = int(time.time())
        print("耗时{}s".format(str(end_time - start_time)))

    计算2000以内的斐波拉契数列,执行结果如下:

  • 相关阅读:
    NestingQuery
    Repeat
    GenericQuery
    StringOpr
    RHEL5.6 安装 virtualbox
    DNS的资料总结
    drop delete truncate 区别
    Linux Shell命令ulimit的用法
    OSI及TCP/IP的概念和区别
    shell:读取文件的每一行内容并输出
  • 原文地址:https://www.cnblogs.com/FG123/p/10137411.html
Copyright © 2011-2022 走看看