Introduction
RabbitMQ is a message broker. The principal idea is pretty simple: it accepts and forwards messages. You can think about it as a post office: when you send mail to the post box you're pretty sure that Mr. Postman will eventually deliver the mail to your recipient. Using this metaphor RabbitMQ is a post box, a post office and a postman.
RabbitMQ和通常意义上的消息发送,用下面行话:
-
Producing means nothing more than sending. A program that sends messages is a producer. We'll draw it like that, with "P":
-
A queue is the name for a mailbox. It lives inside RabbitMQ. Although messages flow through RabbitMQ and your applications, they can be stored only inside a queue. A queue is not bound by any limits, it can store as many messages as you like ‒ it's essentially an infinite buffer. Many producers can send messages that go to one queue, many consumers can try to receive data from one queue. A queue will be drawn as like that, with its name above it:
-
Consuming has a similar meaning to receiving. A consumer is a program that mostly waits to receive messages. On our drawings it's shown with "C":
helloworld例子
生产者发送消息到队列,消费者取出消息并打印。
send.py
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
receive.py
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
代码解释:send.py
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel()
与rabbitmq server建立连接,
channel.queue_declare(queue='hello')
创建一个queue,我们发送的消息会被放到这个消息队列。如果消息队列不存在,rabbitmq就会丢弃这条消息
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'")
在rabbitmq中,消息不能直接发送到queue中,通常会经过一个exchange。这里我们使用默认的exchange,用空串标识。
这个exchange是特殊的,它要求我们精确指定这条消息发送到哪个queue。queue的名字需要通过routing_key变量。
connection.close()
在退出程序前我们要关闭这个连接。
代码解释recv.py
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello')
首先连接到rabbitmq server,然后要确保queue存在,创建一个queue使勇queue_declare。
我们可以运行这个命令多次,但如果名字相同,仅有一个会被创建。
def callback(ch, method, properties, body): print(" [x] Received %r" % body)
从队列中接收消息是比较复杂的,要为queue订阅一个callback函数。当接收到一个消息,callback函数就会被调用。
我们这个回调函数打印消息内容到屏幕上。
channel.basic_consume(callback, queue='hello', no_ack=True)
下一步,告诉rabbitmq特殊的callback函数应该从我们的hello队列接收消息。
print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
最后,开启一个无限的循环等待数据并在必要的时候运行callback函数。
运行结果:
# python send.py [x] Sent 'Hello World!' # python receive.py [*] Waiting for messages. To exit press CTRL+C [x] Received b'Hello World!'