zoukankan      html  css  js  c++  java
  • RabbitMQ

    简单模式 

    The Python code based on pika==1.0.0 version

    producer:
      channel.basic_publish(
        exchange = ``,
        routing_key = 'hello',
        body = “世界你好!”
      )
    	- 这种交换是特殊的‒它使我们可以准确地指定消息应进入的队列。队列名称需要在routing_key参数中指定.
    
    consumer:
      channel.basic_consume(
        queue='hello', 
        on_message_callback=callback,
        auto_ack=True
      )
    

      

    Putting it all together

    send.py

    #!/usr/bin/env python
    import pika
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')  # 队列的声明是幂等的
    
    channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
    print(" [x] Sent 'Hello World!'")
    connection.close()
    

    receive.py

    #!/usr/bin/env python
    import pika, sys, os
    
    def main():
        connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        channel = connection.channel()
    
        channel.queue_declare(queue='hello')
    
        def callback(ch, method, properties, body):
            print(" [x] Received %r" % body)
    
        channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
    
        print(' [*] Waiting for messages. To exit press CTRL+C')
        channel.start_consuming()
    
    if __name__ == '__main__':
        try:
            main()
        except KeyboardInterrupt:
            print('Interrupted')
            try:
                sys.exit(0)
            except SystemExit:
                os._exit(0)
    

      

     消息确认机制

     铺垫:同生产者消费者模型,消费者在处理消息后要向队列发送消息回执,表明该消息已被消息掉。

     官网解释:bool auto_ack: if set to True, automatic acknowledgement mode will be used

     该参数默认为False,意味着RabbitMQ会消耗越来越多的内存,因为它无法释放那些未被确认的消息。

     当设置为True时: 如果一个消费者在处理消息过程中挂掉了,那么这个消息就丢失了,并且还丢失了发送给此消费者所有尚未处理的消息。

     这种情况肯定不是我们想要的,当一个消费者挂掉,如果能重新把消息发给其他消费者,这样,我们的消息就不会丢失任何消息了。要如何做呢? (消息持久化+工作队列+任务派遣)

    消息持久化

    当RabbitMQ服务挂掉时,队列和消息都会丢失,要想不丢失,需要做持久化。

    # 队列持久化
    channel.queue_declare(queue='hello', durable=True)  
    
    # 消息持久化
    发送端:
      channel.basic_publish(
    	  exchange='',
    	  routing_key='task_queue',
    	  body=message,
    	  properties=pika.BasicProperties(
    	  delivery_mode=2, # make message persistent
    	  ))
    
    消费端:
    def callback(ch, method, properties, body):
    	print(" [x] Received %r" % body.decode())
    	ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)

    工作队列

    工作队列又叫任务队列

    使用场景:当一个消费者处理一条消息很耗时,我又不想等待它完成,想把消息分发给其他消费者时

    准备:一个生产者、多个消费者

    MQ将按顺序地将每个消息发送给下一个消费者,平均而言,每个消费者都会收到相同数量的消息,这种分发方式称为循环。

    以循环方式分发消息仍然不能解决上述场景的问题,理想情况是公平的分发消息(在消费者忙碌时就把消息发给不忙的消费者)

    公平派遣/公平分发

    为了克服这个问题,我们可以将 Channel#basic_qos通道方法与 prefetch_count = 1设置一起使用。

    使用basic.qos协议方法来告诉RabbitMQ一次不向消费者发送多条消息。换句话说,在处理并确认上一条消息之前,不要将新消息发送给消费者。而是将其分派给不忙的下一个消费者。

    channel.basic_qos(prefetch_count = 1)
    

    关于队列大小的注意事项

    如果所有工作人员都忙,您的队列就满了。您将需要注意这一点,并可能增加更多的工作人员,或使用消息TTL

    Putting it all together

     send.py

    #!/usr/bin/env python
    import pika
    import sys
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='task_queue', durable=True)
    
    message = ' '.join(sys.argv[1:]) or "Hello World!"
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))
    print(" [x] Sent %r" % message)
    connection.close()
    

    receive.py

    #!/usr/bin/env python
    import pika
    import time
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='task_queue', durable=True)
    print(' [*] Waiting for messages. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body.decode())
        time.sleep(body.count(b'.'))
        print(" [x] Done")
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(queue='task_queue', on_message_callback=callback)
    
    channel.start_consuming()
    

     

      

  • 相关阅读:
    洛谷p1056
    __int64
    杭电2057
    4.4清北学堂Day1 主要内容:数论,数学
    递推的一点理解
    高精度减法
    高精度加法
    p1184高手之在一起
    对于rqy今天讲座的一些理解和看法吧
    PHP.21-商品信息管理
  • 原文地址:https://www.cnblogs.com/liuwei0824/p/14685017.html
Copyright © 2011-2022 走看看