zoukankan      html  css  js  c++  java
  • Python-RabbitMQ-RPC(非阻塞版)

    服务器端:rpc_server.py

    import pika,time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    
    channel = connection.channel()
    
    channel.queue_declare(queue='rpc_queue')
    
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n-1) + fib(n -2)
    
    
    def on_request(ch, method, props, body):
        n = int(body)
    
        print("[.] fib(%s)" % n)
        response = fib(n)#斐波那契的执行结果赋值给reponse
        #再把得到的消息发回给客户端
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties = pika.BasicProperties(
                             correlation_id= 
                                props.correlation_id
                         ),
                         body =str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)#确保消息被消费,代表任务完成
    
    
    #channel.basic_qos(prefetch_count=1)
    channel.basic_consume(on_request,
                          queue='rpc_queue')
    
    print(" [x] Awaiting RPC request")
    channel.start_consuming()
    

    客户端:rpc_client.py

    import pika,sys,uuid
    import time
    
    
    class FibonacciRpcClient(object):
        def __init__(self):
    
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    
            self.channel = self.connection.channel()
    
            result = self.channel.queue_declare(exclusive=True)
            self.callback_queue = result.method.queue#获取queue名字
    
            self.channel.basic_consume(self.on_response,#只要收到就调用on_response()
                                       no_ack=True,
                                       queue=self.callback_queue
                                       )
    
        def on_response(self, ch, method, props, body):
            if self.corr_id == props.correlation_id:#判断服务器端corr_id和本地corr_id相等,才往下走
                self.response = body#response收到body的消息表示response不为空
    
        def call(self, n):
            self.response = None
            self.corr_id = str(uuid.uuid4())
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue',
                                       properties=pika.BasicProperties(
                                           reply_to=self.callback_queue,#指定返回到那个queue
                                           correlation_id=self.corr_id,
                                       ),
                                       body=str(n))#传字符串,把30传进来
            while self.response is None:
                #收到消息,就会触发on_response(),没消息就继续往下走循环
                self.connection.process_data_events()#非阻塞版的start_consuming
                print("no msg...")#只要走到这,就相当于没消息
                time.sleep(0.5)
            return int(self.response)
    
    fibonacci_rpc = FibonacciRpcClient()
    
    print(" [x] Requesting fib(30)")
    response = fibonacci_rpc.call(8)
    print(" [.] Got %r" % response)
    
  • 相关阅读:
    获取连接无线路由客户机信息命令
    HTB进行流量控制方法
    exec函数族用法
    java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
    struts2从请求取值的三种方式
    用jsp写的网页 怎么在传递参数时包含中文?
    Struts2使用DoubleSelect实现二级级联下拉框省份城市
    MySQL里主键与外键的关系
    查看struts2源码
    WIN7系统下,用笔记本发送WIFI信号让手机无线上网!
  • 原文地址:https://www.cnblogs.com/fuyuteng/p/9263764.html
Copyright © 2011-2022 走看看