生产者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' )) channel = connection.channel() # 声明一个管道,在管道里发消息 # 声明queue channel.queue_declare(queue = 'hello' , durable = True ) # 在管道里还得声明一个队列 # durable只是把队列持久化,消息不持久化 channel.basic_publish(exchange = '', routing_key = 'hello' , # 就是列队queue名字 body = 'Hello World' , # 消息内容 properties = pika.BasicProperties( delivery_mode = 2 , #消息持久化如果队列没有设置durable=True的话消息是没有办法持久化的 ) ) print ( " [x] Sent 'Hello World!'" ) connection.close() # 不用关闭管道,关闭连接就行 |
消费者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
import pika # 建立到达RabbitMQ Server的connection # 此处RabbitMQ Server位于本机-localhost connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' )) channel = connection.channel() # 声明queue,确认要从中接收message的queue # queue_declare函数是幂等的,可运行多次,但只会创建一次 # 若可以确信queue是已存在的,则此处可省略该声明,如producer已经生成了该queue # 但在producer和consumer中重复声明queue是一个好的习惯 channel.queue_declare(queue = 'hello' ,durable = True ) print ( ' [*] Waiting for messages. To exit press CTRL+C' ) channel.basic_qos(prefetch_count = 1 ) #如果有一个消息,服务器就不发,没消息就发 # 定义回调函数 # 一旦从queue中接收到一个message回调函数将被调用 # ch:channel # method: # properties: # body:message def callback(ch, method, properties, body): print ( " [x] Received %r" % body) ch.basic_ack(delivery_tag = method.delivery_tag) #执行完后确认,client执行完后给rabbitmq返回的一个标识,收到这个标识后rabbitmq认为这个消息处理完了,不会在重复发送给其他client继续执行 # 从queue接收message的参数设置 # 包括从哪个queue接收message,用于处理message的callback,是否要确认message # 默认情况下是要对消息进行确认的,以防止消息丢失。 # 此处将no_ack明确指明为True,不对消息进行确认。 channel.basic_consume(callback, queue = "hello" , #no_ack=True#不对消息确认 ) # 开始循环从queue中接收message并使用callback进行处理 channel.start_consuming() |