zoukankan      html  css  js  c++  java
  • RabbitMQ消息队列(五):Routing 消息路由

    上篇文章中,我们构建了一个简单的日志系统。接下来,我们将丰富它:能够使用不同的severity来监听不同等级的log。比如我们希望只有error的log才保存到磁盘上。

    1. Bindings绑定

        上篇文章中我们是这么做的绑定:

    [python] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. channel.queue_bind(exchange=exchange_name,  
    2.                    queue=queue_name)  

        绑定其实就是关联了exchange和queue。或者这么说:queue对exchagne的内容感兴趣,exchange要把它的Message deliver到queue中。

        实际上,绑定可以带routing_key 这个参数。其实这个参数的名称和basic_publish 的参数名是相同了。为了避免混淆,我们把它成为binding key。
        使用一个key来创建binding :

    [python] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. channel.queue_bind(exchange=exchange_name,  
    2.                    queue=queue_name,  
    3.                    routing_key='black')  

    对于fanout的exchange来说,这个参数是被忽略的。

    2. Direct exchange

      Direct exchange的路由算法非常简单:通过binding key的完全匹配,可以通过下图来说明。 


        exchange X和两个queue绑定在一起。Q1的binding key是orange。Q2的binding key是black和green。
        当P publish key是orange时,exchange会把它放到Q1。如果是black或者green那么就会到Q2。其余的Message都会被丢弃。


    3. Multiple bindings

          多个queue绑定同一个key是可以的。对于下图的例子,Q1和Q2都绑定了black。也就是说,对于routing key是black的Message,会被deliver到Q1和Q2。其余的Message都会被丢弃。

       

    4. Emitting logs

    首先是我们要创建一个direct的exchange:

    [python] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. channel.exchange_declare(exchange='direct_logs',  
    2.                          type='direct')  

    我们将使用log的severity作为routing key,这样Consumer可以针对不同severity的log进行不同的处理。
    publish:

    [python] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. channel.basic_publish(exchange='direct_logs',  
    2.                       routing_key=severity,  
    3.                       body=message)  

    我们使用三种severity:'info', 'warning', 'error'.

    5. Subscribing

    对于queue,我们需要绑定severity:

    [python] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. result = channel.queue_declare(exclusive=True)  
    2. queue_name = result.method.queue  
    3.   
    4. for severity in severities:  
    5.     channel.queue_bind(exchange='direct_logs',  
    6.                        queue=queue_name,  
    7.                        routing_key=severity)  


    6. 最终版本

    The code for emit_log_direct.py:

    [python] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. #!/usr/bin/env python  
    2. import pika  
    3. import sys  
    4.   
    5. connection = pika.BlockingConnection(pika.ConnectionParameters(  
    6.         host='localhost'))  
    7. channel = connection.channel()  
    8.   
    9. channel.exchange_declare(exchange='direct_logs',  
    10.                          type='direct')  
    11.   
    12. severity = sys.argv[1] if len(sys.argv) > else 'info'  
    13. message = ' '.join(sys.argv[2:]) or 'Hello World!'  
    14. channel.basic_publish(exchange='direct_logs',  
    15.                       routing_key=severity,  
    16.                       body=message)  
    17. print " [x] Sent %r:%r" % (severity, message)  
    18. connection.close()  


    The code for receive_logs_direct.py:

    [python] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. #!/usr/bin/env python  
    2. import pika  
    3. import sys  
    4.   
    5. connection = pika.BlockingConnection(pika.ConnectionParameters(  
    6.         host='localhost'))  
    7. channel = connection.channel()  
    8.   
    9. channel.exchange_declare(exchange='direct_logs',  
    10.                          type='direct')  
    11.   
    12. result = channel.queue_declare(exclusive=True)  
    13. queue_name = result.method.queue  
    14.   
    15. severities = sys.argv[1:]  
    16. if not severities:  
    17.     print >> sys.stderr, "Usage: %s [info] [warning] [error]" %   
    18.                          (sys.argv[0],)  
    19.     sys.exit(1)  
    20.   
    21. for severity in severities:  
    22.     channel.queue_bind(exchange='direct_logs',  
    23.                        queue=queue_name,  
    24.                        routing_key=severity)  
    25.   
    26. print ' [*] Waiting for logs. To exit press CTRL+C'  
    27.   
    28. def callback(ch, method, properties, body):  
    29.     print " [x] %r:%r" % (method.routing_key, body,)  
    30.   
    31. channel.basic_consume(callback,  
    32.                       queue=queue_name,  
    33.                       no_ack=True)  
    34.   
    35. channel.start_consuming()  

    我们想把warning和error的log记录到一个文件中:

    [python] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. $ python receive_logs_direct.py warning error > logs_from_rabbit.log  

    打印所有log到屏幕:

    [python] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. $ python receive_logs_direct.py info warning error  
    2.  [*] Waiting for logs. To exit press CTRL+C  
  • 相关阅读:
    通讯录封装实现
    简单通讯录的实现 main..h .m文件全部
    iOS 开发 OC编程 字典和集合 排序方法
    iOS 开发 OC编程 数组冒泡排序.图书管理
    iOS 开发 OC编程 属性和字符串练习
    iOS 开发 OC编程 属性和字符串
    iOS 开发 OC编程 便利构造器以及初始化方法
    iOS 开发 OC编程 方法的书写
    IOS 开发 OC编程 类和对象
    iOS 开发 c语言阶段考试题
  • 原文地址:https://www.cnblogs.com/amylis_chen/p/6287688.html
Copyright © 2011-2022 走看看