本文介绍RabbitMQ在python下的基本使用
1. RabbitMQ安装,安装RabbitMQ需要预安装erlang语言,Windows直接下载双击安装即可
RabbitMQ下载地址:http://www.rabbitmq.com/download.html
Erlang语言下载地址:http://www.erlang.org/downloads
2. RabbitMQ的基本使用
发送消息端
# -*- coding:utf-8 -*- # Author:Wong Du import pika # 生成RabbitMQ连接对象 connection = pika.BlockingConnection( pika.ConnectionParameters("localhost") ) # 建立连接管道 channel = connection.channel() # 进行队列声明,指定消息要发送到的队列 # durable声明队列持久性,即RabbitMQ重启队列仍旧不丢失(不完全可靠) # 对现有队列进行durable声明更改无法生效,只有重新声明一个新的队列才生效 channel.queue_declare(queue="hello4", durable=True) # 发送消息到RabbitMQ消息队列里 channel.basic_publish( exchange='', # 指定交换器 routing_key="hello4", # 要绑定的队列 body="Nice to meet you22223333...", # 要发送的消息 properties=pika.BasicProperties( delivery_mode=2, # 通过delivery_mode=2将队列内的消息持久化 ) ) print("[x] sent 'Nice to meet you22233332...'") connection.close()
接收消息端
# -*- coding:utf-8 -*- # Author:Wong Du import pika import time # 生成RabbitMQ连接对象 connection = pika.BlockingConnection( pika.ConnectionParameters("localhost") ) # 建立连接管道 channel = connection.channel() # 声明队列名 channel.queue_declare(queue="hello4", durable=True) # 定义构建回调函数 def callback(ch, method, properties, body): print(ch) # time.sleep(20) print("[x] received %r" % body) # ch.basic_ack(delivery_tag= method.delivery_tag) # 接收端回复消息给rabbixmq,代表该消息处理已完成
# 默认情况下,RabbitMQ循环把消息发送给consumer, # 通过basic_qos(prefetch_count=1)设置,可以在处理并确认完前一个消息之前,不再接收新信息 # 即实现“能者多劳”的效果 channel.basic_qos(prefetch_count=1)
channel.basic_consume( callback, queue="hello4", no_ack=True, # 如果为True,rabbitmq会在consumer接收到数据时就删除队列中的消息 ) print("[*] waiting for messages. To exit press CTRL+C...") channel.start_consuming()
-----------------------拓展一吓,RabbitMQ消息转发类型:分类筛选,广播,正则匹配------------------------------
3. 分类筛选:RabbitMQ通过消息分类进行对应发送和接收
发送消息端
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 # -*- coding:utf-8 -*- 2 # Author:Wong Du 3 4 import pika 5 import sys 6 7 # 生成RabbitMQ连接对象 8 connection = pika.BlockingConnection( 9 pika.ConnectionParameters('localhost') 10 ) 11 12 # 建立连接管道 13 channel = connection.channel() 14 15 # 声明交换器和交换器转发消息类型;'direct'为分类筛选 型 16 channel.exchange_declare(exchange='_direct', 17 exchange_type='direct') 18 19 # 获取脚本命令:根据脚本命令,定义对应严重级别的消息(默认为info类消息) 20 severity = sys.argv[1] if len(sys.argv) > 1 else 'info' 21 # 获取要发送给RabbitMQ的消息(默认为Hello world!) 22 msg = "".join(sys.argv[2:]) or "Hello world!" 23 '''例:python direct分类筛选_producer.py info 无关痛痒的消息 24 表示给RabbitMQ发送一条info级别的消息,消息内容:无关痛痒的消息 25 ''' 26 27 channel.basic_publish(exchange='_direct', # 指定交换器 28 routing_key=severity, # 指定要筛选的字段 29 body=msg # 要发送给RabbitMQ的消息 30 ) 31 32 print("----exec done----") 33 connection.close()
接收消息端
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 # -*- coding:utf-8 -*- 2 # Author:Wong Du 3 4 import pika 5 import sys 6 7 # 生成RabbitMQ连接对象 8 connection = pika.BlockingConnection( 9 pika.ConnectionParameters('localhost') 10 ) 11 12 # 建立连接管道 13 channel = connection.channel() 14 15 # 声明交换器和交换器消息转发类型;'direct'为分类筛选 型 16 channel.exchange_declare(exchange='_direct', 17 exchange_type='direct',) 18 19 # 声明随机独家管道,用以接收RabbitMQ的消息 20 Random_Queue = channel.queue_declare(exclusive=True) 21 # 获取管道名 22 queue_name = Random_Queue.method.queue 23 24 # 获取脚本命令;获取相应严重级别的消息,可多个严重级别 25 severities = sys.argv[1:] 26 if not severities: 27 sys.stderr.write("