zoukankan      html  css  js  c++  java
  • RabbitMQ消息队列(五): 主题分发

    1. 主题(Topics):

    fanout模式只能进行简单的广播,direct模式虽然在过滤上进行了一定的提升,但是不能支持复杂的条件,

    比如我们的日志消息,现在不仅要知道消息级别,也要知道消息来源。在这样的复杂需求下,我们需要使用

    主题交换。

    2. 主题交换:

    发送主题交换的的routing_key不是任意的,必须遵循如下格式:使用.分隔的一些字。通常这些字用来表示

    消息的某些特性,如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。

    注意routing_key的最大长度是255。

    绑定routing_key也必须是同样的格式,交换后端形式与直接路由相似,交换匹配消息中的routing_key和绑定队列

    所需要接受消息的routing_key,并且将满足条件的消息进行派发。

    通配符:

    * -- 代表一个字(word)

    # -- 代表零个或者多个字

    如下图模型,我们使用"<celerity>.<colour>.<species>"来形容动物,可见Q1关心所有橘黄的动物,

    Q2关心所有兔子或者懒惰的动物。

    "quick.orange.rabbit" -- 分发到Q1和Q2

    "lazy.orange.elephant" -- 分发到Q1和Q2

    "quick.orange.fox" -- 分发到Q1

    "lazy.brown.fox" -- 分发到Q2

    "lazy.pink.rabbit" -- 只分发一次到Q2,尽管匹配两个条件

    "quick.brown.fox" -- 无匹配,丢弃

    "quick.orange.male.rabbit" -- 无匹配,丢弃

    "lazy.orange.male.rabbit" -- 匹配规则3,分发到Q2

    3. 测试代码:

    emit_log_topic.py

     1 #!/usr/bin/env python
     2 import pika
     3 import sys
     4 
     5 connection = pika.BlockingConnection(pika.ConnectionParameters(
     6         host='localhost'))
     7 channel = connection.channel()
     8 
     9 channel.exchange_declare(exchange='topic_logs',
    10                          type='topic')
    11 
    12 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    13 message = ' '.join(sys.argv[2:]) or 'Hello World!'
    14 channel.basic_publish(exchange='topic_logs',
    15                       routing_key=routing_key,
    16                       body=message)
    17 print(" [x] Sent %r:%r" % (routing_key, message))
    18 connection.close()

    receive_logs_topic.py

    #!/usr/bin/env python
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_logs',
                             type='topic')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    binding_keys = sys.argv[1:]
    if not binding_keys:
        sys.stderr.write("Usage: %s [binding_key]...
    " % sys.argv[0])
        sys.exit(1)
    
    for binding_key in binding_keys:
        channel.queue_bind(exchange='topic_logs',
                           queue=queue_name,
                           routing_key=binding_key)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
  • 相关阅读:
    OpenCV---在图片上加入文字
    DosBox 报错 this program requires dosxnt.exe to be in your path
    iOS开发-UITableView单选多选/复选实现1
    LeetCode第七题,Reverse Integer
    【PostgreSQL】PostgreSQL操作-psql基本命令
    Bootstrap的js插件之弹出框(popover)
    Qt Quick 图像处理实例之美图秀秀(附源代码下载)
    【甘道夫】并行化频繁模式挖掘算法FP Growth及其在Mahout下的命令使用
    用Visual Studio高版本号打开低版本号的project,转换时出现错误:fatal error LNK1123: 转换到 COFF 期间失败: 文件无效或损坏
    如何安装ArchLinux
  • 原文地址:https://www.cnblogs.com/wanpengcoder/p/5292954.html
Copyright © 2011-2022 走看看