zoukankan      html  css  js  c++  java
  • RabbitMQ

    消息属性
    AMQP 0-9-1协议预定义了消息附带的14个属性集。除以下内容外,大多数属性很少使用:
    
    delivery_mode:将消息标记为持久性(值为2)或瞬态(任何其他值)。可能还记得第二个教程中的此属性。
    content_type:对于经常使用的JSON编码,将此属性设置为application / json是一个好习惯。
    reply_to:通常用于命名回调队列。
    related_id:用于将RPC响应与请求相关联。在队列中收到响应后,尚不清楚响应属于哪个请求。就是使用correlation_id属性的时候。每个请求的唯一值。
    

    Putting it all together

    server.py

    #!/usr/bin/env python
    import pika
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
    
    channel = connection.channel()
    
    channel.queue_declare(queue='rpc_queue')
    
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n - 1) + fib(n - 2)
    
    def on_request(ch, method, props, body):
        n = int(body)
    
        print(" [.] fib(%s)" % n)
        response = fib(n)
    
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(correlation_id = 
                                                             props.correlation_id),
                         body=str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
    
    print(" [x] Awaiting RPC requests")
    channel.start_consuming()
    
    • 我们可能要运行多个服务器进程。为了将负载平均分配到多个服务器上,我们需要设置 prefetch_count设置。

    client.py

    #!/usr/bin/env python
    import pika
    import uuid
    
    class FibonacciRpcClient(object):
    
        def __init__(self):
            self.connection = pika.BlockingConnection(
                pika.ConnectionParameters(host='localhost'))
    
            self.channel = self.connection.channel()
    
            result = self.channel.queue_declare(queue='', exclusive=True)
            self.callback_queue = result.method.queue
    
            self.channel.basic_consume(
                queue=self.callback_queue,
                on_message_callback=self.on_response,
                auto_ack=True)
    
        def on_response(self, ch, method, props, body):
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self, n):
            self.response = None
            self.corr_id = str(uuid.uuid4())
            self.channel.basic_publish(
                exchange='',
                routing_key='rpc_queue',
                properties=pika.BasicProperties(
                    reply_to=self.callback_queue,
                    correlation_id=self.corr_id,
                ),
                body=str(n))
            while self.response is None:
                self.connection.process_data_events()  # 该方法定时自动向RabbitMQ的消息代理发送心跳包来维持当前连接
            return int(self.response)
    
    
    fibonacci_rpc = FibonacciRpcClient()
    
    print(" [x] Requesting fib(30)")
    response = fibonacci_rpc.call(30)
    print(" [.] Got %r" % response)
    

      

  • 相关阅读:
    斐波那契数列 的两种实现方式(Java)
    单链表反转
    单链表合并
    两个有序list合并
    list去重 转载
    RemoveAll 要重写equals方法
    Java for LeetCode 138 Copy List with Random Pointer
    Java for LeetCode 137 Single Number II
    Java for LeetCode 136 Single Number
    Java for LeetCode 135 Candy
  • 原文地址:https://www.cnblogs.com/liuwei0824/p/14736377.html
Copyright © 2011-2022 走看看