zoukankan      html  css  js  c++  java
  • 基于Python语言使用RabbitMQ消息队列(四)

    路由

    在上一节我们构建了一个简单的日志系统。我们能够广播消息给很多接收者。

    在本节我们将给它添加一些特性——我们让它只订阅所有消息的子集。例如,我们只把严重错误(critical error)导入到日志文件(存入磁盘空间),但仍然可以打印所有日志消息到控制台。

    绑定

    前面的例子中我们已经创建了绑定,像下面这样:

    channel.queue_bind(exchange=exchange_name,
                       queue=queue_name)
    • 1
    • 2

    绑定是一个交易所和一个队列之间的关系。这可以解释为:一个队列对源于这个交易所的消息感兴趣。 
    绑定可以采用一个额外的routing_key 参数。为避免同basic_publish参数混淆,我们称呼它为绑定键(binding key)。我们用一个键来创建一个绑定This is how we could create a binding with a key:

    channel.queue_bind(exchange=exchange_name,
                       queue=queue_name,
                       routing_key='black')
    • 1
    • 2
    • 3

    一个绑定键的意义取决于交易所类型。我们先前使用过的fanout类型交易所就会忽略它的值。

    直接型交易所(Direct exchange)

    我们先前的日志系统会广播所有消息给所有消费者。我们现在想扩展它让过滤掉一些消息,基于这些消息的严重级别。例如我们可能想要向磁盘写日志的脚本只接收严重错误critical error,不要浪费磁盘空间在warning或info日志上面。

    我们正使用的fanout类型交易所不会给我们太大的灵活性——它只能够无意识地进行广播。

    我们会使用直接型交易所进行替代,直接型交易所背后的路由算法很简单——一条消息会前往绑定键(binding key)恰巧匹配这条消息的路由键(routing key)的队列。 
    为了阐释这个问题,考虑下图的设定: 
    这里写图片描述 
    在这个设定当中我们看到直接型交易所X有两个队列与之绑定。第一个队列的绑定键为“orange”,第二个有两个绑定键,一个是“black”,另一个是“green”。

    在这个设定中,发布到交易所中的带有路由键“orange”的会被路由到队列Q1。带有路由键“black”和“green”的会前往Q2。所有其他的消息会被忽略。

    多重绑定

    这里写图片描述 
    使用相同的绑定键绑定多个队列完全没有问题。在我们的例子中我们可以用绑定键“black”在交易所X和队列Q1之间添加绑定。这样的话,direct型交易所就会表现的像fanout交易所,广播消息给所有匹配的队列。具有“black”路由键的消息会被传送给Q1和Q2。

    生成日志

    我们会为日志系统使用这个模型。发送消息到direct交易所,而非fanout交易所。我们会提供日志级别(log severity)作为路由键。这样接收脚本就能够选择它想要的级别来接收。我们首先关注生成日志。

    像通常我们需要创建一个交易所时那样:

    channel.exchange_declare(exchange='direct_logs',
                             type='direct')
    • 1
    • 2

    准备好发送消息:

    channel.basic_publish(exchange='direct_logs',
                          routing_key=severity,
                          body=message)
    • 1
    • 2
    • 3

    为了使事情简单我们假设’severity’ 是 ‘info’, ‘warning’, ‘error’中的一种。

    订阅

    接收消息就跟之前的教程中一样,不同的是——我们要为每种severity创建一个新的绑定。

    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    for severity in severities:
        channel.queue_bind(exchange='direct_logs',
                           queue=queue_name,
                           routing_key=severity)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    整合

    这里写图片描述 
    emit_log_direct.py完整代码:

    #!/usr/bin/env python
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs',
                             type='direct')
    
    severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='direct_logs',
                          routing_key=severity,
                          body=message)
    print(" [x] Sent %r:%r" % (severity, message))
    connection.close()
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    receive_logs_direct.py完整代码:

    #!/usr/bin/env python
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs',
                             type='direct')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    severities = sys.argv[1:]
    if not severities:
        sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
        sys.exit(1)
    
    for severity in severities:
        channel.queue_bind(exchange='direct_logs',
                           queue=queue_name,
                           routing_key=severity)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    我在自己的Ubuntu终端开启了三个控制台,两个用来接收日志消息,其中一个设置为只接收error日志,并把收到的日志存入日志文件。另一个接收info,warning和error,直接打印到屏幕。如下图: 
    写入文件: 
    这里写图片描述 
    打印到屏幕: 
    这里写图片描述

    生成日志: 
    这里写图片描述 
    查看日志文件确实只收到了error日志: 
    这里写图片描述

  • 相关阅读:
    How does a single thread run on multiple cores?
    一篇所有研究生都该读的好文:阳光温热 科研静好
    Running Slic3r from git on GNU Linux
    3D打印的各种问题及解决方案
    新工科的新视角:面向可持续竞争力的敏捷教学体系
    What is Modularity
    3d打印模型为什么文件格式必须是stl和stp的?
    剑指offer 39.平衡二叉树
    剑指offer 38.二叉树的深度
    剑指offer 37.数字在排序数组中出现的次数
  • 原文地址:https://www.cnblogs.com/ExMan/p/10283996.html
Copyright © 2011-2022 走看看