zoukankan      html  css  js  c++  java
  • python rabbitMQ 实现RPC

    客户端:

    '''
    RPC: remote procedure call
    
    远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
    RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。
    有多种 RPC模式和执行。最初由 Sun 公司提出。IETF ONC 宪章重新修订了 Sun 版本,使得 ONC RPC 协议成为 IETF 标准协议。现在使用最普遍的模式和执行是开放式软件基础的分布式计算环境(DCE)
    
    简单来说, 一个服务既是生产者又是消费者
    '''
    import  pika
    import uuid
    import time
    
    class FibonacciRpcClient(object):
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
            self.channel = self.connection.channel()
            result = self.channel.queue_declare(exclusive=True)
            self.callback_queue = result.method.queue
            #声明接收的queue
            self.channel.basic_consume(self.on_response,no_ack=True,queue=self.callback_queue)
    
        def on_response(self,ch,method,props,body):
            #只有接收的id是自己生成的id时才会将body 赋值给 response  ,防止接收到其他客户端的结果
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self,n):
            self.response=None
            self.corr_id = str(uuid.uuid4())
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue',
                                       properties=pika.BasicProperties(reply_to=self.callback_queue,#告诉服务器 客户端监听返回值的queue 的name
                                                                       correlation_id=self.corr_id),
                                       body=str(n))
    
            while self.response is None:
                #接收消息
               # self.channel.start_consuming() 与self.connection.process_data_events() 的区别是前者会阻塞 而后者不会阻塞
                self.connection.process_data_events()
                print("no message")
                time.sleep(0.5)
            return int(self.response)
    
    fibonacci_rpc  = FibonacciRpcClient()
    
    print("request fib(30)")
    response = fibonacci_rpc.call(6)
    print("result:",response)

    服务器:

    '''
    接收client 消息, 处理 , 返回
    
    '''
    
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='rpc_queue')
    
    def fib(n):
        if n==0:
            return 0
        elif n==1:
            return 1
        else:
            return fib(n-1)+fib(n-2)
    
    def on_request(ch,method,props,body):
        n = int(body)
        print("fib : ",n)
        response = fib(n)
    
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to, #服务器端将消息发送到客户端监听的queue中, 这个参数是客户端传递过来的
                         properties=pika.BasicProperties(correlation_id=props.correlation_id),body = str(response))
    
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(on_request,queue='rpc_queue')
    print('wait rpc request')
    channel.start_consuming()
  • 相关阅读:
    Scala进阶之路-idea下进行spark编程
    Scala进阶之路-Spark本地模式搭建
    Scala进阶之路-Scala高级语法之隐式(implicit)详解
    Scala进阶之路-Spark底层通信小案例
    Scala进阶之路-并发编程模型Akka入门篇
    Scala进阶之路-统计商家id的标签数以及TopN示例案例分析
    Scala进阶之路-Scala中的泛型介绍
    Scala进阶之路-尾递归优化
    Scala进阶之路-Scala特征类与unapply反向抽取
    Java基础-爬虫实战之爬去校花网网站内容
  • 原文地址:https://www.cnblogs.com/gaizhongfeng/p/8099310.html
Copyright © 2011-2022 走看看