zoukankan      html  css  js  c++  java
  • Python通过RabbitMQ实现RPC

    Client端代码:

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import pika
    import uuid
    import time
    
    
    class FibonacciRpcClient(object):
        def __init__(self):
            #生成socket
            self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
            #生成管道
            self.channel = self.connection.channel()
            #声明一个随机queue,exclusive=True会在此queue的消费者断开后,自动将queue删除
            result = self.channel.queue_declare(exclusive=True)
            #获取随机queue名
            self.callback_queue = result.method.queue
            #定义收到消息后的动作
            self.channel.basic_consume(self.on_response,       #回调函数on_response
                                       no_ack=True,
                                       queue=self.callback_queue)    #获取随机queue名
    
        def on_response(self, ch, method, props, body):
            if self.corr_id == props.correlation_id:  #判断uuid是否是否一致
                self.response = body        #队列返回
    
        def call(self, n):
            self.response = None
            self.corr_id = str(uuid.uuid4()) #生成uuid,等会发送给服务端
            #发送消息给服务端
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue',   #路由键
                                       properties=pika.BasicProperties(reply_to=self.callback_queue,  #告诉服务端将返回发到哪个队列
                                                                       correlation_id=self.corr_id),
                                       body=str(n))    #发送的消息
            while self.response is None:
                self.connection.process_data_events()   #非阻塞版的start_consuming(),如果收到消息就执行on_response回调函数
                print("no msg....")
                time.sleep(0.5)         #这里可以执行其他命令
            return int(self.response)   #返回结果
    
    
    #生成实例
    fibonacci_rpc = FibonacciRpcClient()
    
    print("[x] Requesting fib(30)")
    
    #调用call函数
    response = fibonacci_rpc.call(30)
    
    print("[x] got %r " % response)

    server端代码:

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import pika
    import time
    
    #生成socket
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    #生成管道
    channel = connection.channel()
    #声明一个queue防止启动报错
    channel.queue_declare(queue='rpc_queue')
    
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n - 1) + fib(n - 2)
    
    
    def on_request(ch, method, props, body):
        n = int(body)
    
        print("[.] fib(%s)" % n)
        response = fib(n)
    
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(correlation_id=props.correlation_id),
                         body=str(response))
    
        ch.basic_ack(delivery_tag=method.delivery_tag)  #回复确认消息
    
    
    
    #处理完这条再发下一条
    channel.basic_qos(prefetch_count=1)
    #定义收到消息动作
    channel.basic_consume(on_request,queue='rpc_queue')
    
    channel.start_consuming()
  • 相关阅读:
    idea html,js修改不用重启进程
    opencv rtsp 人脸识别
    The system is running in low-graphics mode UB16
    阿里云ecs 增加虚拟网卡
    rtsp
    mysql5.7报err 1055错误 sql_mode=only_full_group_by
    python 生成requirements.txt
    Linux 保护文件 不给修改
    logback logback.xml常用配置详解(三) <filter>
    logback 常用配置详解(二) <appender>
  • 原文地址:https://www.cnblogs.com/Template/p/9615839.html
Copyright © 2011-2022 走看看