zoukankan      html  css  js  c++  java
  • RabbitMQ远程执行任务RPC。

    如果想发一条命令给远程机器,再把结果返回

    这种模式叫RPC:远程过程调用

    发送方将发送的消息放在一个queue里,由接收方取。

    接收方再把执行结果放在另外一个queue里,由发送方取

    实际上,发送方把1,随机生成的接收queue名 2,UUID值发过去了(用于确认)

    客户端:作用是发送指令

    '''
    RPC客户端代码
    '''
    import pika
    import uuid
    import time
    
    class FibonacciRpcClient(object):
        def __init__(self):
            # 建连接
            self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
            # 建频道
            self.channel = self.connection.channel()
            # 声明随机queue,不用了自动删除
            result = self.channel.queue_declare(exclusive=True)
            # 获取随机queue名称
            self.callback_queue = result.method.queue
            # 准备消费者,从随机queue中消费信息交给on_response处理
            # 声明了我要收了
            self.channel.basic_consume(self.on_response,
                                       no_ack=True,
                                       queue=self.callback_queue)
        def on_response(self, ch, method, props, body):
            # 自己发的ID和客户端返回的ID一样才执行
            # 只有这个相等,才说明这个结果是我发过去的命令的结果。Perfect!
            if self.corr_id == props.correlation_id:
                self.response = body
        def call(self, n):
            self.response = None
            # 生成的是唯一字符串,每次都不一样,将这个发给服务器端
            self.corr_id = str(uuid.uuid4())
            # 发消息到rpc_queue里,可以看出,发的时候参数里把回调queue名给服务器了
            # 这样服务器就知道向哪个queue里发,带的参数是灵活的queue,服务器计算结果放到该queue
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue',
                                       properties=pika.BasicProperties(
                                           reply_to=self.callback_queue,
                                           correlation_id=self.corr_id
                                       ),
                                       body=str(n))
            # 发了消息就该收结果了,但没有写start_consuming方法(该方法会阻塞),
            # 这里self.connection.process_data_events()相当于非阻塞型的start_consuming
            # self.response初始化为None
            while self.response is None:
                self.connection.process_data_events() # 收到消息就会触发on_response函数
                print("no message...")
                time.sleep(0.5) # 这里其实可以不sleep,可以再发个命令。后发的这个可能先执行,不用UUID就不知道哪个是哪个
            return int(self.response)
    
    fibonacci_rpc = FibonacciRpcClient()
    print("Requesting fib(30)")
    response = fibonacci_rpc.call(30)
    print("response:", response)

    服务器端:接收指令并把处理结果返回

    '''
    RPC服务器端代码
    '''
    import pika
    
    # 建连接
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    # 建频道
    channel = connection.channel()
    # 声明queue,先启动服务器,客户端就不用声明rpc_queue了
    channel.queue_declare(queue='rpc_queue')
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n-1) + fib(n-2)
    def on_request(ch, method, props, body):
        n = int(body)
        print("fib({0}):".format(n))
        response = fib(n)
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(
                             # 这个是客户端发过来的correlation_id,再发回去
                             correlation_id=props.correlation_id
                         ),
                         body=str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(on_request, queue='rpc_queue')
    print("Awaiting RPC requests")
    channel.start_consuming()

    运行结果:

    '''
    先启动服务端:
    Awaiting RPC requests
    
    再启动客户端:
    Requesting fib(30)
    no message...
    no message...
    no message...
    no message...
    response: 832040
    
    再看服务器端:
    Awaiting RPC requests
    fib(30):
    
    '''
  • 相关阅读:
    easyui tree loader用法
    mysql字符集
    mysql 内连接 左连接 右连接 外连接
    mysql 聚集函数和分组
    mysql 大数据量求平均值
    C++ 纯虚方法
    Windows xcopy
    服务端数据库的操作如何不阻塞
    分布式系统业务服务器的设计
    mysql 查询执行的流程
  • 原文地址:https://www.cnblogs.com/staff/p/9944450.html
Copyright © 2011-2022 走看看