zoukankan      html  css  js  c++  java
  • Go RabbitMQ(五)主题

    RabbitMQ topic

    在之前我们将交换器的类型从fanout设置为direct后能够根据我们的选择获得响应的消息,虽然改良我们的消息日志系统,但是还有很多局限性,比如它不能基于多个标准进行路由

    在我们的日志系统中我们可能不仅仅是依据消息的严重性进行订阅,还有可能同时基于消息的危险等级和消息来源,比如我们监听来自cron的危险错误和来自kern的所有日志。通过topic我们可以来实现以上功能

    主题交换器(topic exchange)

    消息如果发送到主题交换器的话不能使用任何的routing_key,它必须是由点分隔的单词列表。单词可以是任意的,但通常它们是与消息相关的一些特性

    binding key必须是具有相同格式,topic交换器背后的逻辑跟direct交换器的逻辑类似,一个指定了routing key的消息将会被投递到所有使用binding key并与routing key 相匹配的队列中。binding key有两种特殊情况:

    • * 可以代表代替一个单词
    • # 可以代替0个或多个单词

    在本例子中,我们将发送所有描述动物的消息,这些消息将使用由三个单词(两个点)组成的routing_key发送。routing_key第一个单词描述速度,第二个表示颜色,第三个表示物种

    队列Q1使用binding_key:*.orange.*,队列Q2使用binding_key*.*.rabbitlazy.#。总结如下:

    • Q1队列将会接收所有orange的动物,比如quick.orange.rabbit,lazy.orange.elephant,quick.orange.fox,
    • Q2队列会接收所有跟rabbit相关和lazy类型的动物,比如quick.orange.rabbit,lazy.orange.elephant,lazy.brown.fox,lazy.pink.rabbit
    • quick.brown.fox跟以上两个队列的routing_key都不匹配所以该消息会被丢弃
    • lazy.orange.male.rabbit则只能匹配队列Q2
    topic 交换器非常灵活并且可以表现为其他交换器
    比如设置队列的binding_key为#,则队列会接收所有的消息不管routing_key是什么
    当binding_key不使用特殊字段`*`和`#`的时候,此时topic交换器跟direct交换器一样
    

    完整代码如下:

    • emitLogsTopic.go
    func main() {
            conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
            failOnError(err, "Failed to connect to RabbitMQ")
            defer conn.Close()
    
            ch, err := conn.Channel()
            failOnError(err, "Failed to open a channel")
            defer ch.Close()
    
            err = ch.ExchangeDeclare(
                    "logs_topic", // name
                    "topic",      // type
                    true,         // durable
                    false,        // auto-deleted
                    false,        // internal
                    false,        // no-wait
                    nil,          // arguments
            )
            failOnError(err, "Failed to declare an exchange")
    
            body := bodyFrom(os.Args)
            err = ch.Publish(
                    "logs_topic",          // exchange
                    severityFrom(os.Args), // routing key
                    false, // mandatory
                    false, // immediate
                    amqp.Publishing{
                            ContentType: "text/plain",
                            Body:        []byte(body),
                    })
            failOnError(err, "Failed to publish a message")
    
            log.Printf(" [x] Sent %s", body)
    }
    
    func bodyFrom(args []string) string {
            var s string
            if (len(args) < 3) || os.Args[2] == "" {
                    s = "hello"
            } else {
                    s = strings.Join(args[2:], " ")
            }
            return s
    }
    
    func severityFrom(args []string) string {
            var s string
            if (len(args) < 2) || os.Args[1] == "" {
                    s = "anonymous.info"
            } else {
                    s = os.Args[1]
            }
            return s
    }
    
    • receiveLogsTopic.go
    func main() {
            conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
            failOnError(err, "Failed to connect to RabbitMQ")
            defer conn.Close()
    
            ch, err := conn.Channel()
            failOnError(err, "Failed to open a channel")
            defer ch.Close()
    
            err = ch.ExchangeDeclare(
                    "logs_topic", // name
                    "topic",      // type
                    true,         // durable
                    false,        // auto-deleted
                    false,        // internal
                    false,        // no-wait
                    nil,          // arguments
            )
            failOnError(err, "Failed to declare an exchange")
    
            q, err := ch.QueueDeclare(
                    "",    // name
                    false, // durable
                    false, // delete when usused
                    true,  // exclusive
                    false, // no-wait
                    nil,   // arguments
            )
            failOnError(err, "Failed to declare a queue")
    
            if len(os.Args) < 2 {
                    log.Printf("Usage: %s [binding_key]...", os.Args[0])
                    os.Exit(0)
            }
            for _, s := range os.Args[1:] {
                    log.Printf("Binding queue %s to exchange %s with routing key %s",
                            q.Name, "logs_topic", s)
                    err = ch.QueueBind(
                            q.Name,       // queue name
                            s,            // routing key
                            "logs_topic", // exchange
                            false,
                            nil)
                    failOnError(err, "Failed to bind a queue")
            }
    
            msgs, err := ch.Consume(
                    q.Name, // queue
                    "",     // consumer
                    true,   // auto ack
                    false,  // exclusive
                    false,  // no local
                    false,  // no wait
                    nil,    // args
            )
            failOnError(err, "Failed to register a consumer")
    
            forever := make(chan bool)
    
            go func() {
                    for d := range msgs {
                            log.Printf(" [x] %s", d.Body)
                    }
            }()
    
            log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
            <-forever
    }
    
    • 接收所有的日志:go run receiveLogsTopic.go "#"
    • 接收来自kern的所有日志:go run receiveLogsTopic.go "kern.*"
    • 接收关于critical的日志:go run receiveLogsTopic.go "*.critical"
    • 创建多个binding:go run receiveLogsTopic.go "kern.*" "*.critical"
    • 启动发送消息的脚本:go run receiveLogsTopic.go "kern.critical" "A critical kernel error"
  • 相关阅读:
    Python进阶03 模块
    Python进阶02 文本文件的输入输出
    Python进阶01 词典
    Python基础10 反过头来看看
    Python基础09 面向对象的进一步拓展
    Python基础08 面向对象的基本概念
    Python基础07 函数
    Vuex源码分析(转)
    Vue2.x双向数据绑定
    Angular2的双向数据绑定
  • 原文地址:https://www.cnblogs.com/develop-SZT/p/10706599.html
Copyright © 2011-2022 走看看