zoukankan      html  css  js  c++  java
  • RabbitMQ之Topics(多规则路由)

    Exchange中基于direct类型无法基于多种规则进行路由。

    例如分析syslog日志,不仅需要基于severity(info/warning/critical/error)进行路由,还需要基于auth、cron或者kernal模式进行路由。

    Topic exchange可以满足这种需求。

    Topic exchange

    基于topic类型交换器的routing key不是唯一的,而是一系列词,基于点区分。

    例如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"

    binding key也是类型方式。

    *表示只匹配一个关键字

    #可以匹配0或者多个关键字

    Q1队列匹配对所有orange的关注

    Q2队列可以监听rabbits,以及所有lazy开头的

    quick.orange.fox只会进入Q1,lazy.brown.fox只会进入Q2

    lazy.pink.rabbit尽管和两个topic都匹配上,但是只会进入队列以此

    quick.brown.fox不会和任何队列进行绑定,因此会被丢弃

     如果发送orange或者quick.orange.male.rabbit不会和任何队列进行绑定,也都会被丢弃。

    Topic exchange功能很强大,类似其他exchange。当一个队列设置binding key为#,则它可以接收所有消息,不管routing key如何设置,类似fanout exchange。

    当不设置*和#的话,topic exchange就类似direct exchange。

    例子

    发送消息

    #!/usr/bin/env python
    #-*- coding:utf8 -*-
    import sys 
    import pika
    import logging
    
    logging.basicConfig(format='%(levelname)s:$(message)s',level=logging.CRITICAL)
    
    def emit_log():
    
        pika.connection.Parameters.DEFAULT_HOST = 'localhost'
        pika.connection.Parameters.DEFAULT_PORT = 5672
        pika.connection.Parameters.DEFAULT_VIRTUAL_HOST = '/' 
        pika.connection.Parameters.DEFAULT_USERNAME = 'guosong'
        pika.connection.Parameters.DEFAULT_PASSWORD = 'guosong'
    
        para = pika.connection.Parameters()
    
        connection = pika.BlockingConnection(para)
    
        channel = connection.channel()
        #声明一个topic_logs交换器,类型为topic
        channel.exchange_declare(exchange='topic_logs',type='topic')
    
        #指定路由key
        routing_key = sys.argv[1] if len(sys.argv) >1 else 'anonymous.info'
    
        message = ' '.join(sys.argv[2:]) or "info:Hello World!"                                         
    
        #发送的时候指定routing_key为空,没有绑定队列到交换器上,消息将会丢失
        #对于日志类消息,如果没有消费者监听的话,这些消息就会忽略
        channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message)
    
        #%r也是string类型
        print "[x] Sent %r" % (message,)
    
        connection.close()
    
    if __name__ == '__main__':
        emit_log()
    

    接收消息

    def callback(ch, method, properties, body):
        print " [x] %r " % (body,)
    
    def receive_logs():
    
        pika.connection.Parameters.DEFAULT_HOST = 'localhost'
        pika.connection.Parameters.DEFAULT_PORT = 5672
        pika.connection.Parameters.DEFAULT_VIRTUAL_HOST = '/' 
        pika.connection.Parameters.DEFAULT_USERNAME = 'guosong'
        pika.connection.Parameters.DEFAULT_PASSWORD = 'guosong'
    
        para = pika.connection.Parameters()
    
        connection = pika.BlockingConnection(para)
    
        channel = connection.channel()
    
        #声明一个topic_logs交换器,类型为topic
        channel.exchange_declare(exchange='topic_logs',type='topic')
    
        #声明一个随机队列,设置exclusive=True,在该consumer退出的时候,对应的队列被删除
        result = channel.queue_declare(exclusive=True)
        #获取随机队列的名称
        queue_name = result.method.queue
    
        binding_keys = sys.argv[1:]
    
        if not binding_keys:
            print >> sys.stderr, "Usage: %s [binding_key]" %  sys.argv[0]
            sys.exit(1)
    
        for binding_key in binding_keys:
             #绑定交换器和队列
            channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key)
        print '[*] Wating for logs.To exit press CTRL+C'
    
        #开始消费消息
        channel.basic_consume(callback,queue=queue_name,no_ack=True)
    
        channel.start_consuming()
    

    接收所有消息

    python receive_logs_topic.py "#"
    

    接收kern开头,包含一个关键字

    python receive_logs_topic.py "kern.*"

    多个绑定

    python receive_logs_topic.py "kern.*" "*.critical"
    
    guosong@guosong:~/code/rabbitmq/ch5$ python emit_log.py "kern.critical" "A critical kernel error"[x] Sent 'A critical kernel error'
    
    guosong@guosong:~/code/rabbitmq/ch5$ ./receive_logs.py "kern.*"
    [*] Wating for logs.To exit press CTRL+C
     [x] 'A critical kernel error' 
    

    参考链接

    http://www.rabbitmq.com/tutorials/tutorial-five-python.html

  • 相关阅读:
    Win32汇编
    Boost ASIO 实现异步IO远控
    Python 使用oslo.vmware管理ESXI虚拟机
    Python 巡检接入钉钉机器人
    Django Ajax序列化与反序列化
    Nacos 认证绕过
    Lanproxy 遍历目录漏洞 CVE-2021-3019 附批量POC
    Apache Solr 全版本任意读取文件漏洞
    垂直水平居中的多种方法 主要的4种
    vue provide/inject 父组件如何给孙子组件传值
  • 原文地址:https://www.cnblogs.com/gsblog/p/3825197.html
Copyright © 2011-2022 走看看