生产者:
''' 使用rabbitMQ 要先安装pika 模块 ''' import pika #建立一个socket connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #声明一个管道 channel = connection.channel() #声明queue #durable 将队列持久化 , 当rabbitMQ 服务停止掉 , 队列不会丢失(队列中的消息没有持久化) #将消息持久化可以在channel.basic_publish 中加入如下参数 # properties = pika.BasicProperties( # delivery_mode=2 # ) channel.queue_declare(queue='hello1',durable=True) #通过管道发送消息 # routing_key 是queue 的name #body 是消息内容 #properties 将消息持久化 channel.basic_publish( exchange='', routing_key='hello1', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, ) ) print("发送 hello world") #关闭队列 connection.close()
消费者:
import pika #创建一个链接 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #创建一个管道 channel = connection.channel() #声明一个队列,要从这个队列中接收消息 #在生产者中也定义了这个queue, 是因为不知道生产者先启动还是消费者先启动,为了确保这个queue一定存在,所以在两边都定义了这个queue #durable 将队列持久化 , 当rabbitMQ 服务停止掉 , 队列不会丢失(队列中的消息没有持久化) channel.queue_declare(queue='hello1',durable=True) #回调函数 #ch 管道内存对象 这里边是channel #method def callback(ch,method , properties,body): print("接收:%s" %body) #手动确认完成 # ch.basic_ack(delivery_tag=method.delivery_tag) #当前没有处理的消息才会接收消息 channel.basic_qos(prefetch_count=1) #开始接收处理消息 #callback : 回调函数, 来处理消息 , 所以callback 中写处理逻辑 #queeu 队列名称 #no_ack 当处理完成之后是否返回确认, 默认是确认即 no_ack = false , 如果no_ack = false , 当一个消费者处理一半的时候挂掉了,那么这个消息不会丢失,将由其他消费者处理 channel.basic_consume(callback, queue='hello1', no_ack=True ) print('等待接收数据') #开始接收消息,没有消息将会卡住 channel.start_consuming()