https://blog.csdn.net/appleyuchi/article/details/79190113
队列和消息是两个概念?
假如消息队列test里面还有消息等待消费者(consumers)去接收,但是这个时候服务器端宕机了,这个时候消息是否还在?
1、队列消息非持久化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import
pika
# 声明一个socket 实例
connect
=
pika.BlockingConnection(pika.ConnectionParameters(
"localhost"
))
# 声明一个管道
channel
=
connect.channel()
# 声明queue名称为test
channel.queue_declare(queue
=
"test"
)
#RabbitMQ的消息永远不会被直接发送到队列中,它总是需要经过一次交换
channel.basic_publish(exchange
=
'',
routing_key
=
"test"
,
body
=
"hello word"
)
print
(
"Sent 'hello world'"
)
connect.close()
channel.queue_declare(queue
=
"test"
)
①队列持久化很简单,只需要在服务端(produce)声明queue的时候添加一个参数:
channel.queue_declare(queue
=
'shuaigaogao'
, durable
=
True
)
# durable=True 持久化
channel.basic_publish(exchange
=
"",
routing_key
=
"shuaigaogao"
,
#queue的名字
body
=
"hello world"
,
#body是要发送的内容
properties
=
pika.BasicProperties(delivery_mode
=
2
,)
# make message persistent=>使消息持久化的特性
)
1
channel.queue_declare(queue
=
'shuaigaogao'
, durable
=
True
)
小结:
RabbitMQ在服务端没有声明队列和消息持久化时,队列和消息是存在内存中的,服务端宕机了,队列和消息也不会保留。
- 服务端声明持久化,客户端想接受消息的话,必须也要声明queue时,也要声明持久化,不然的话,客户端执行会报错。
以上两句是整篇文章的重中之重!!!
RabbitMQ 消息公平分发
可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
channel.basic_qos(prefetch_count
=
1
)
服务端:
import
pika
# 声明一个socket 实例
connect
=
pika.BlockingConnection(pika.ConnectionParameters(
"localhost"
))
# 声明一个管道
channel
=
connect.channel()
# 声明queue名称为test
channel.queue_declare(queue
=
"test"
, durable
=
True
)
# 队列持久化
#RabbitMQ的消息永远不会被直接发送到队列中,它总是需要经过一次交换
channel.basic_publish(exchange
=
'',
routing_key
=
"test"
,
body
=
"hello word"
,
properties
=
pika.BasicProperties(delivery_mode
=
2
,))
# 消息持久化
print
(
"Sent 'hello world'"
)
connect.close()
import
pika
import
time
# 声明socket实例
connect
=
pika.BlockingConnection(pika.ConnectionParameters(
"localhost"
))
# 声明一个管道 虽然在之前的produce代码中声明过一次管道,
# 但是在不知道produce中的管道是否运行之前(如果未运行,consumers中也不声明的话就会报错),
# 在consumers中也声明一次是一种正确的做法
channel
=
connect.channel()
#声明queue
channel.queue_declare(queue
=
"test"
, durable
=
True
)
#回调函数
def
callback(ch, method, properites, body):
time.sleep(
30
)
print
(
"-----"
, ch, method, properites, body)
print
(
"Received %r"
%
body)
ch.basic_ack(delivery_tag
=
method.delivery_tag)
# 手动确认收到消息,添加手动确认时,no_ack必须为False,不然就会报错
channel.basic_qos(prefetch_count
=
1
)
# 在消息消费之前加上消息处理配置
channel.basic_consume(callback,
queue
=
"test"
,
no_ack
=
False
)
print
(
"Waiting for messages"
)
#这个start只要一启动,就一直运行,它不止收一条,而是永远收下去,没有消息就在这边卡住
channel.start_consuming()