zoukankan      html  css  js  c++  java
  • RabbitMQ 适用于云计算集群的远程调用(RPC)

            在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成。那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例。

    1. 客户端接口 Client interface

            为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class。 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果。代码如下:

    [python] view plain copy
     
    print?
    1. fibonacci_rpc = FibonacciRpcClient()  
    2. result = fibonacci_rpc.call(4)  
    3. print "fib(4) is %r" % (result,)  
    fibonacci_rpc = FibonacciRpcClient()
    result = fibonacci_rpc.call(4)
    print "fib(4) is %r" % (result,)

    2. 回调函数队列 Callback queue

            总体来说,在RabbitMQ进行RPC远程调用是比较容易的。client发送请求的Message然后server返回响应结果。为了收到响应client在publish message时需要提供一个”callback“(回调)的queue地址。code如下:

    [python] view plain copy
     
    print?
    1. result = channel.queue_declare(exclusive=True)  
    2. callback_queue = result.method.queue  
    3.   
    4. channel.basic_publish(exchange='',  
    5.                       routing_key='rpc_queue',  
    6.                       properties=pika.BasicProperties(  
    7.                             reply_to = callback_queue,  
    8.                             ),  
    9.                       body=request)  
    10.   
    11. # ... and some code to read a response message from the callback_queue ...  
    result = channel.queue_declare(exclusive=True)
    callback_queue = result.method.queue
    
    channel.basic_publish(exchange='',
                          routing_key='rpc_queue',
                          properties=pika.BasicProperties(
                                reply_to = callback_queue,
                                ),
                          body=request)
    
    # ... and some code to read a response message from the callback_queue ...

    2.1 Message properties

    AMQP 预定义了14个属性。它们中的绝大多很少会用到。以下几个是平时用的比较多的:

    • delivery_mode: 持久化一个Message(通过设定值为2)。其他任意值都是非持久化。请移步RabbitMQ消息队列(三):任务分发机制
    • content_type: 描述mime-type 的encoding。比如设置为JSON编码:设置该property为application/json。
    • reply_to: 一般用来指明用于回调的queue(Commonly used to name a callback queue)。
    • correlation_id: 在请求中关联处理RPC响应(correlate RPC responses with requests)。

    3. 相关id Correlation id

           在上个小节里,实现方法是对每个RPC请求都会创建一个callback queue。这是不高效的。幸运的是,在这里有一个解决方法:为每个client创建唯一的callback queue。

           这又有其他问题了:收到响应后它无法确定是否是它的,因为所有的响应都写到同一个queue了。上一小节的correlation_id在这种情况下就派上用场了:对于每个request,都设置唯一的一个值,在收到响应后,通过这个值就可以判断是否是自己的响应。如果不是自己的响应,就不去处理。

    4. 总结

         工作流程:

    • 当客户端启动时,它创建了匿名的exclusive callback queue.
    • 客户端的RPC请求时将同时设置两个properties: reply_to设置为callback queue;correlation_id设置为每个request一个独一无二的值.
    • 请求将被发送到an rpc_queue queue.
    • RPC端或者说server一直在等待那个queue的请求。当请求到达时,它将通过在reply_to指定的queue回复一个message给client。
    • client一直等待callback queue的数据。当message到达时,它将检查correlation_id的值,如果值和它request发送时的一致那么就将返回响应。

    5. 最终实现

    The code for rpc_server.py:

    [python] view plain copy
     
    print?
    1. #!/usr/bin/env python  
    2. import pika  
    3.   
    4. connection = pika.BlockingConnection(pika.ConnectionParameters(  
    5.         host='localhost'))  
    6.   
    7. channel = connection.channel()  
    8.   
    9. channel.queue_declare(queue='rpc_queue')  
    10.   
    11. def fib(n):  
    12.     if n == 0:  
    13.         return 0  
    14.     elif n == 1:  
    15.         return 1  
    16.     else:  
    17.         return fib(n-1) + fib(n-2)  
    18.   
    19. def on_request(ch, method, props, body):  
    20.     n = int(body)  
    21.   
    22.     print " [.] fib(%s)"  % (n,)  
    23.     response = fib(n)  
    24.   
    25.     ch.basic_publish(exchange='',  
    26.                      routing_key=props.reply_to,  
    27.                      properties=pika.BasicProperties(correlation_id =   
    28.                                                      props.correlation_id),  
    29.                      body=str(response))  
    30.     ch.basic_ack(delivery_tag = method.delivery_tag)  
    31.   
    32. channel.basic_qos(prefetch_count=1)  
    33. channel.basic_consume(on_request, queue='rpc_queue')  
    34.   
    35. print " [x] Awaiting RPC requests"  
    36. channel.start_consuming()  
    #!/usr/bin/env python
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    
    channel = connection.channel()
    
    channel.queue_declare(queue='rpc_queue')
    
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n-1) + fib(n-2)
    
    def on_request(ch, method, props, body):
        n = int(body)
    
        print " [.] fib(%s)"  % (n,)
        response = fib(n)
    
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(correlation_id = 
                                                         props.correlation_id),
                         body=str(response))
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(on_request, queue='rpc_queue')
    
    print " [x] Awaiting RPC requests"
    channel.start_consuming()
    


    The server code is rather straightforward:

    • (4) As usual we start by establishing the connection and declaring the queue.
    • (11) We declare our fibonacci function. It assumes only valid positive integer input. (Don't expect this one to work for big numbers, it's probably the slowest recursive implementation possible).
    • (19) We declare a callback for basic_consume, the core of the RPC server. It's executed when the request is received. It does the work and sends the response back.
    • (32) We might want to run more than one server process. In order to spread the load equally over multiple servers we need to set theprefetch_count setting.

    The code for rpc_client.py:

    [python] view plain copy
     
    print?
    1. #!/usr/bin/env python  
    2. import pika  
    3. import uuid  
    4.   
    5. class FibonacciRpcClient(object):  
    6.     def __init__(self):  
    7.         self.connection = pika.BlockingConnection(pika.ConnectionParameters(  
    8.                 host='localhost'))  
    9.   
    10.         self.channel = self.connection.channel()  
    11.   
    12.         result = self.channel.queue_declare(exclusive=True)  
    13.         self.callback_queue = result.method.queue  
    14.   
    15.         self.channel.basic_consume(self.on_response, no_ack=True,  
    16.                                    queue=self.callback_queue)  
    17.   
    18.     def on_response(self, ch, method, props, body):  
    19.         if self.corr_id == props.correlation_id:  
    20.             self.response = body  
    21.   
    22.     def call(self, n):  
    23.         self.response = None  
    24.         self.corr_id = str(uuid.uuid4())  
    25.         self.channel.basic_publish(exchange='',  
    26.                                    routing_key='rpc_queue',  
    27.                                    properties=pika.BasicProperties(  
    28.                                          reply_to = self.callback_queue,  
    29.                                          correlation_id = self.corr_id,  
    30.                                          ),  
    31.                                    body=str(n))  
    32.         while self.response is None:  
    33.             self.connection.process_data_events()  
    34.         return int(self.response)  
    35.   
    36. fibonacci_rpc = FibonacciRpcClient()  
    37.   
    38. print " [x] Requesting fib(30)"  
    39. response = fibonacci_rpc.call(30)  
    40. print " [.] Got %r" % (response,)  
    #!/usr/bin/env python
    import pika
    import uuid
    
    class FibonacciRpcClient(object):
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                    host='localhost'))
    
            self.channel = self.connection.channel()
    
            result = self.channel.queue_declare(exclusive=True)
            self.callback_queue = result.method.queue
    
            self.channel.basic_consume(self.on_response, no_ack=True,
                                       queue=self.callback_queue)
    
        def on_response(self, ch, method, props, body):
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self, n):
            self.response = None
            self.corr_id = str(uuid.uuid4())
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue',
                                       properties=pika.BasicProperties(
                                             reply_to = self.callback_queue,
                                             correlation_id = self.corr_id,
                                             ),
                                       body=str(n))
            while self.response is None:
                self.connection.process_data_events()
            return int(self.response)
    
    fibonacci_rpc = FibonacciRpcClient()
    
    print " [x] Requesting fib(30)"
    response = fibonacci_rpc.call(30)
    print " [.] Got %r" % (response,)


    The client code is slightly more involved:

    • (7) We establish a connection, channel and declare an exclusive 'callback' queue for replies.
    • (16) We subscribe to the 'callback' queue, so that we can receive RPC responses.
    • (18) The 'on_response' callback executed on every response is doing a very simple job, for every response message it checks if thecorrelation_id is the one we're looking for. If so, it saves the response inself.response and breaks the consuming loop.
    • (23) Next, we define our main call method - it does the actual RPC request.
    • (24) In this method, first we generate a unique correlation_id number and save it - the 'on_response' callback function will use this value to catch the appropriate response.
    • (25) Next, we publish the request message, with two properties: reply_to and correlation_id.
    • (32) At this point we can sit back and wait until the proper response arrives.
    • (33) And finally we return the response back to the user.

    开始rpc_server.py:

    [python] view plain copy
     
    print?
    1. $ python rpc_server.py  
    2.  [x] Awaiting RPC requests  
    $ python rpc_server.py
     [x] Awaiting RPC requests

    通过client来请求fibonacci数:

    [python] view plain copy
     
    print?
    1. $ python rpc_client.py  
    2.  [x] Requesting fib(30)  
    $ python rpc_client.py
     [x] Requesting fib(30)

          现在这个设计并不是唯一的,但是这个实现有以下优势:

    • 如何RPC server太慢,你可以扩展它:启动另外一个RPC server。
    • 在client端, 无所进行加锁能同步操作,他所作的就是发送请求等待响应。

          我们的code还是挺简单的,并没有尝试去解决更复杂和重要的问题,比如:

    • 如果没有server在运行,client需要怎么做?
    • RPC应该设置超时机制吗?
    • 如果server运行出错并且抛出了异常,需要将这个问题转发到client吗?
    • 需要边界检查吗?

    转载自: anzhsoft: http://blog.csdn.net/anzhsoft/article/details/19633107

    参考资料:

    1. http://www.rabbitmq.com/tutorials/tutorial-six-python.html

  • 相关阅读:
    c#动态类型
    [转]鼠标和键盘模拟API
    c#各类型转byte[]或转回
    Unity3D发布安卓报错permisson denied的解决
    NuGet修改packages目录/迁移缓存文件夹
    数据结构:单向链表系列7--交换相邻两个节点2(交换链域/指针域)
    数据结构:单向链表系列6--交换相邻两个节点1(交换数据域)
    数据结构:单向链表系列5--在链表中查找元素
    数据结构:单向链表系列4--获取链表长度(迭代法和递归法)
    数据结构:单向链表系列3--删除节点
  • 原文地址:https://www.cnblogs.com/liyongbin/p/8193241.html
Copyright © 2011-2022 走看看