zoukankan      html  css  js  c++  java
  • [rabbitmq] python版本(四) 路由

    路由Routing

    这篇主要较上一篇新增一个功能--只接收订阅消息的一个字集。eg.只把严重的错误日志信息写入日志文件(存储到磁盘),但同时仍然讲所有日志信息输出到控制台中。
    简单说来就是routing_key决定某一条交付给交换机exchange传给哪个队列,可以将多个routing_key设定给一个队列/也可以将同一个routing_key设定给多个队列(这种情况其实就是实现广播)

    绑定Bindings

    绑定的的时候可以带上一个额外的routing_key参数。为了避免与basic_publish的参数混淆,叫做绑定的键binding key。创建一个带binding key的绑定

    channel.queue_bind(exchange=exchange_name,
                       queue=queue_name,
                       routing_key='black')
    

    binding key具体意义与采用的exchange类型有关,扇形交换机fanout exchanges会忽略这个值.

    直连交换机direct exchange

    相较于广播的方式,Direct exchange主要实现一种分类/过滤(扇形交换机fanout exchange就没有足够的灵活性,只是广播)。算法:交换机讲绑定键(binding key--在上面的queue_bind的时候创建的,即在绑定交换机和队列的时候设定的)和路由键(routing key--下面basic_publish中设定的,即发布消息的时候设定的)进行精确匹配,从而确定消息发送到哪一个队列,如下图

    在这个场景中,我们可以看到直连交换机 X和两个队列进行了绑定。第一个队列使用orange作为绑定键,第二个队列有两个绑定,一个使用black作为绑定键,另外一个使用green。

    这样以来,当路由键为orange的消息发布到交换机,就会被路由到队列Q1。路由键为black或者green的消息就会路由到Q2。其他的所有消息都将会被丢弃。

    这种情况即:多个binding key可以被绑定到同一个queue上面

    多个绑定multiple bindings


    多个队列使用相同的绑定键是合法的。这个例子中,我们可以添加一个X和Q1之间的绑定,使用black绑定键。这样一来,直连交换机就和扇型交换机的行为一样,会将消息广播到所有匹配的队列。带有black路由键的消息会同时发送到Q1和Q2。

    这种情况即:一个binding key绑定多个queue上面

    发送日志

    我们将会发送消息到一个直连交换机,把日志级别作为路由键。这样接收日志的脚本就可以根据严重级别来选择它想要处理的日志。我们先看看发送日志。
    创建一个交换机exchange

    channel.exchange_declare(exchange='direct_logs',
                             type='direct')
    

    发送一条消息

    channel.basic_publish(exchange='direct_logs',
                          routing_key=severity,
                          body=message)
    

    这里的严重程度severity值为info/warning/error中的一个

    订阅

    处理接收消息方式和之前差不多,只有一个例外,我们将会为我们感兴趣的每个严重级别分别创建一个新的绑定

    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    for severity in severities:
        channel.queue_bind(exchange='direct_logs',
                           queue=queue_name,
                           routing_key=severity)
    

    代码整合


    简单说下这个图:交换机类型--direct,交换机和队列之间的binding key分为三种,这个写了error/info/warning,队列名是默认随机生成的

    emit_log_direct.py

    #!/usr/bin/env python
    import pika
    import sys
    
    #连接
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    #声明名为direct_logs队列,类型direct
    channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
    
    #命令行参数第一个为严重程度
    severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
    #从第二个命令行参数开始为信息
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    #basic_publish发布这些信息,交换机为direct_logs,routing_key为上面第一个命令行参数(默认为info类型,作为分类标准),消息体就是Message,是从第二个命令行参数开始的其他参数
    channel.basic_publish(
        exchange='direct_logs', routing_key=severity, body=message)
    print(" [x] Sent %r:%r" % (severity, message))
    #做一些关闭之前的操作
    connection.close()
    

    receive_logs_direct.py

    #!/usr/bin/env python
    import pika
    import sys
    
    #连接
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    #声明一个名为direct_logs的交换机,类型为direct
    channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
    
    #声明一个随机名字的队列,断开后有处理,名字存在queue_name中
    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue
    
    #命令行参数为严重程度,如果没有命令行参数提示错误-->可以是一种或者多种
    severities = sys.argv[1:]
    if not severities:
        sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
        sys.exit(1)
    
    #绑定交换机exchange和队列,routing_key--分类标准就是严重程度 
    for severity in severities:
        channel.queue_bind(
            exchange='direct_logs', queue=queue_name, routing_key=severity)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    
    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)
    
    channel.start_consuming()
    

    运行结果:可以看到接收端和发送端对于指定的info类型信息的交互

    如果你希望只是保存warning和error级别的日志到磁盘,只需要打开控制台并输入:

    $ python receive_logs_direct.py warning error > logs_from_rabbit.log
    

    如果你希望所有的日志信息都输出到屏幕中,打开一个新的终端,然后输入:

    $ python receive_logs_direct.py info warning error
     [*] Waiting for logs. To exit press CTRL+C
    

    如果要触发一个error级别的日志,只需要输入:

    $ python emit_log_direct.py error "Run. Run. Or it will explode."
     [x] Sent 'error':'Run. Run. Or it will explode.'
    
    日积月累,水滴石穿
  • 相关阅读:
    纹理mag filter不能取GL_XXX_MIPMAP_XXXX
    (转)No architectures to compile for (ONLY_ACTIVE_ARCH=YES, active arch=arm64, VA 解决办法
    轻松制作儿童趣味算术软件
    批处理设置IP地址
    安卓手机文件管理器简单横向评比
    Linux基础和网络管理上机试题
    值得收藏的批处理程序
    王垠:完全用Linux工作
    XINU安装程序.exe一键配置好XINU实验环境
    很全面的WinRAR实用技巧系列
  • 原文地址:https://www.cnblogs.com/lonelyisland/p/12753493.html
Copyright © 2011-2022 走看看