zoukankan      html  css  js  c++  java
  • Python自动化之rabbitmq rpc client端代码分析(原创)

    RPC调用client端解析

    import pika
    import uuid
    # 建立连接
    class FibonacciRpcClient(object):
    
    
        def __init__(self):
    
            # 建立建立连接和通道
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
            self.channel = self.connection.channel()
    
            # exclusive(专有的): Only allow access by the current connection
            result = self.channel.queue_declare(exclusive=True)
    
            # 获取队列名称
            self.callback_queue = result.method.queue
    
            # 接收服务端的回应
            # param on_response:The function for dispatching messages to user, having the signature:
            """
            Start a queue consumer.
            This method asks the server to start a "consumer",
            which is a transient request for messages from a specific queue.
            Consumers last as long as the channel they were declared on, or until the client cancels them.
            """
            self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)
    
            # 接收到返回消息的处理方法消息
        def on_response(self, ch, method, props, body):
                if self.corr_id == props.correlation_id:
                    self.response = body
    
        def call(self, n):
                self.response = None
                self.corr_id = str(uuid.uuid4())
    
                """This method publishes a message to a specific exchange.
                The message will be routed to queues as defined by the exchange configuration
                and distributed to any active consumers when the transaction, if any, is committed."""
                self.channel.basic_publish(exchange="",
                                           routing_key="rpc_queue",
                                           properties=pika.BasicProperties(
                                           reply_to=self.callback_queue, correlation_id=self.corr_id),
                                           body=str(n)
                                           )
                # 确认是否有收到消息,没有的话阻塞在这里
                    # Will make sure that data events are processed. Dispatches timer and
                    # channel callbacks if not called from the scope of BlockingConnection or
                    # BlockingChannel callback. Your app can block on this method.
                    # while self.response is None:  # 跟start_consuming相似
                    # 是一个等待消息的阻塞过程,连接的任何消息都可以使它脱离阻塞状态(有点像Ajax的事件等待机制)
                while self.response is None:
                    self.connection.process_data_events()
                return int(self.response)
    
    ssh_rpc = FibonacciRpcClient()
    
    response = ssh_rpc.call(30)
    
    
    
    
                    # Processes(处理) I/O events and dispatches timers and `basic_consume`
                    # callbacks until all consumers are cancelled."""
                    # 循环接收我们的消息,接收之后并执行我们的callback函数
             
                    channel.start_consuming()
  • 相关阅读:
    城市的划入划出效果
    文本溢出省略解决笔记css
    长串英文数字强制折行解决办法css
    Poj 2352 Star
    树状数组(Binary Indexed Trees,二分索引树)
    二叉树的层次遍历
    Uva 107 The Cat in the Hat
    Uva 10336 Rank the Languages
    Uva 536 Tree Recovery
    Uva10701 Pre, in and post
  • 原文地址:https://www.cnblogs.com/wspblog/p/5973497.html
Copyright © 2011-2022 走看看