Routing
此案例中,我们会尝试更多的特性,例如,仅仅将错误信息传递到Log文件中(节省空间),同时仍能将所有信息打印到控制台上
Bindings有一个额外的参数routing_key,为了防止和basic_publish参数混淆,我们将它叫做binding_key.这是我们怎么去创建一个binding_key:
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black')
这意味着binding key 取决于exchange类型,fanout exchanges 之前使用的exchanges忽略了它的价值
Direct exchange
之前教程的日志系统广播所有的信息给所有的消费者,我们想要扩展允许过滤信息根据严重性,例如,我们可能希望将日志消息写入磁盘的脚本仅接收严重的错误,并且warning或者info日志消息不会浪费空间.
我们使用的fanout exchange它并没有给我们带来太大的灵活性,它只能进行无意识的广播,
我们将使用direct exchange代替,direct exchange路由是很简单的,消息进入绑定的binding和路由的key相匹配的队列里,为了说明这一点,请考虑以下设置:
在此设置中,我们可以看到direct exchange X 和两个队列绑定,第一个队列绑定orange,第二个它是绑定了两个,一个是绑定了black key 另一个是绑定了green。
在这种设置中,通过路由键橙色发布到交换机的消息将被路由到队列Q1。 路由键为黑色或绿色的消息将转到Q2。 所有其他消息将被丢弃。
多重绑定
用相同的binding key绑定多个队列是完全合法的,示例中我们可以在X和Q1之间增加black key 这个绑定是和fanout类似,广播到所有匹配的队列中,路由为black key 的消息将会发送给Q1,Q2
Emitting logs
我们将会使用这个模式的日志系统代替fanout,我们发送消息给cirect exchange,我们将会提供严重日志作为routing key,这样接收脚本将会选择想要接收的严重性日志,让我们关注emitting。
首先我们需要创建一个exchange
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
然后发送消息
channel.basic_publish(exchange='direct_logs',
routing_key=severity,#不能删除
body=message,
properties=pika.BasicProperties(delivery_mode=2,#make message persistent
))
To simplify things we will assume that 'severity' can be one of 'info', 'warning', 'error'.
订阅
接收消息和之前是一样的,有一个例外,我们将创建一个新的绑定和每种严重性相关
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
Putting it all together
direct_send.py
# -*- coding: utf-8 -*-
#send端
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))#默认端口5672,可不写
#创建通道,声明一个管道,在管道里发送消息
channel = connection.channel()
#在管道里声明queue
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
severity = sys.argv[1] if len(sys.argv)>1 else 'info'
message = ''.join(sys.argv[2:]) or "Hello World"
#一条消息永远不能直接发送到队列,它总需要经过一个交换exchange
channel.basic_publish(exchange='direct_logs',
routing_key=severity,#不能删除
body=message,
properties=pika.BasicProperties(delivery_mode=2,#make message persistent
))#设置routing_key(消息队列的名称)和body(发送的内容)
print("[x] Sent %r" % message)
connection.close()#关闭连接,队列关闭
direct_receive.py
# -*- coding: utf-8 -*-
#receiving(消费者接收者)
import pika
import time
import sys
#创建一个连接
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost'))#默认端口5672,可不写
#创建通道,声明一个管道,在管道里发送消息
channel = connection.channel()
#把消息队列的名字为hello,把消费者和queue绑定起来,生产者和queue的也是hello
#为什么又声明了一个hello队列
#如果确定已经声明了,可以不声明。但是你不知道那个机器先运行,所以要声明两次
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
#不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]
" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
#回调函数get消息体
def callback(ch,method,properties,body):#四个参数为标准格式
#管道内存对象,内容相关信息
#print("打印看下是什么:",ch,method,properties) #打印看下是什么
print(" [x] Received %r:%r" % (method.routing_key,body))
ch.basic_ack(delivery_tag=method.delivery_tag)#消息确认
#消费消息
channel.basic_consume(
queue=queue_name,#你要从那个队列里收消息
on_message_callback=callback,#如果收到消息,就调用callback函数来处理消息
auto_ack=True #写的话,如果接收消息,机器宕机消息就丢了
#一般不写,宕机则生产者检测到发给其他消费者
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #创建死循环,监听消息队列,可使用CTRL+C结束监听