zoukankan      html  css  js  c++  java
  • python RabbitMQ消息队列

    RabbitMQ

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ等等。

    与以下两者不同的是,rabbitMq 可以跨进程。

    线程queue:同一进程下不同线程之间的交互。

    进程queue:父进程与子进程进行交互,或者同属于同一父进程下多个子进程进行交互。不同进程之间不能交互。

    基本操作

    RabbitMQ可以同时维护很多的队列,生产者可以把消息放到不同的队列发送给消费者。

    send端

    import pika
    #相当于声明一个socket
    conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    #声明一个管道
    channel = conn.channel()
    
    #声明queue
    channel.queue_declare(queue='hello')
    
    channel.basic_publish(exchange='',routing_key='hello',body='Hello World!') # routing_key 消息的key  body 消息的内容
    
    print('Sent "hello world"')
    
    conn.close()
    

     运行结果

    Sent "hello world"
    

     receive端

    import pika
    #相当于声明一个socket
    conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    
    
    #声明一个管道
    channel = conn.channel()
    
    #声明queue  这里可以不用声明,但是如果消费者先运行,又不希望出错,就要消费者先运行
    channel.queue_declare(queue='hello')
    
    def callback(ch,method,properties,body):
        print('[x] Received %r' % body )
    
    channel.basic_consume(callback, queue='hello',no_ack=True  ) #消费消息 如果收到消息就调用CALLBACK函数处理
    
    print('[*] Waiting for message.To exit press CTRL+C')
    
    channel.start_consuming() #开始收消息
    

     运行结果

    [*] Waiting for message.To exit press CTRL+C
    [x] Received b'Hello World!'
    

     一个生产者对应多个消费者

    代码同上,启动三个消费者和一个生产者,第一个消费者会接收到生产者的第一个消息,第二个消费者会接收到生产者的第二个消息,这样依次轮训。

    如果某一个消费者在处理消息的过程中,断电或者当机了,那么这个消息状态需要确认。消费者处理完成后发状态给生产者,生产者把消息从队列里删除。

    no_ack=True (消费者端参数)

    表示不确认消息状态,生产者不关心消费者状态。 如果把这个参数去掉,那么生产者需要得到消费者的回应状态,比如我启动了3个消费者和一个生产者,生产者需要得到消费者的回应。如果第一个消费者得到消息,中断了连接,那么消息会发送到第二个消费者,依次类推,直到消费者给生产者一个状态,生产者才会把消息从队列里删除。

    这个状态要手动发送给生产者:

    ch.basic_ack(delivery_tag=method.delivery_tag)
    

     生产者接收到状态就会把消息从队列里面删除。

    消息持久化

    从上面我们知道,消费者在发生当机或者其它情况,只要没有把状态返回给生产者,那么这个消息一直都在。如果生产者当机了怎么办?消息还存不存在了呢?

    如果生产者当机,那么之前存的消息都会丢失。为了避免这种情况,那么就要要求数据持久化。

    队列持久化

    durable=True

    在声明队列的时候 同时声明队列持久化。

    channel.queue_declare(queue='hello',durable=True)
    

     

    这里是队列持久化了,但是消息还没有了。

    数据持久化

    在生产者发送消息的一端加上以上参数。

     properties=pika.BasicProperties(
                              delivery_mode=2, 
                          )
    

     

    查看hello2的消息。

  • 相关阅读:
    BestCoder17 1001.Chessboard(hdu 5100) 解题报告
    codeforces 485A.Factory 解题报告
    codeforces 485B Valuable Resources 解题报告
    BestCoder16 1002.Revenge of LIS II(hdu 5087) 解题报告
    codeforces 374A Inna and Pink Pony 解题报告
    codeforces 483B Friends and Presents 解题报告
    BestCoder15 1002.Instruction(hdu 5083) 解题报告
    codeforces 483C.Diverse Permutation 解题报告
    codeforces 483A. Counterexample 解题报告
    NSArray中地内存管理 理解
  • 原文地址:https://www.cnblogs.com/qing-chen/p/7698416.html
Copyright © 2011-2022 走看看