RabbitMQ是一个消息中间件——接收和发送消息。你可以把它想象成一个邮局,当你把邮件投递到邮箱后,你就可以确信邮递员最终会帮你把邮件寄给收件人。
术语:
生产者——发送消息;
队列:“邮箱”,存在于RabbitMQ内。虽然消息可以在RabbitMQ及应用中流转,但是只能在队列中存储,队列没有限度,可以存储任意多的消息,多个生产者可以向一个队列中发送消息,多个消费者也可以一个队列中接收消息;
消费者——接收消息;
生产者消费者消息中间件多数情况下不在同一台机器上。
基于Python客户端pika实现Hello World程序——简单的发送接收消息。
发送端:首先与RabbitMQ服务建立连接,之后在发送消息前我们需要确保接收队列存在,如果向一个不存在的队列发送消息,RabbitMQ将会丢弃。在RabbitMQ中,消息不能直接发送给队列,而是需要通过一个交换机(exchange)。现在先使用默认交换机,它允许我们通过routing_key参数精确指定要发往的队列。退出程序前,我们需要确保网络缓存被flush,消息被投递到RabbitMQ。关闭连接可以达到该效果。
接收端:首先也是连接到RabbitMQ服务,确保队列存在。使用queue_declare创建队列是等幂操作,可以调用多次,只有一次成功创建。这种重复创建是有道理的,因为我们不确定哪个程序先启动,所以在两边都创建是一个很好的经验。如果你想查看一下RabbitMQ中有什么队列以及队列中有多少消息,可以使用rabbitmqctl工具: sudo rabbitmqctl list_queues。从队列接收消息相对复杂,需要向队列注册一个回调函数。当收到消息后,回调函数会被pika库调用。为了注册成功,必须确保订阅的队列存在。最后开启一个循环,等待数据。
发送端:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
接收端:
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()