zoukankan      html  css  js  c++  java
  • rabbitmq消息队列——"路由"

    在之前的教程中,我们创建了一个简单的日志系统。我们能够向许多交换器转发日志消息。

    在本教程中,我们将添加一个功能——我们让它仅仅接收我们感兴趣的日志类别。举例:我们 实现仅将严重级别的错误日志写入磁盘(为了节省磁盘空间),其余日志级别的日志直接打印到控制台。

    绑定

    之前的章节中我们已经创建过绑定,你可能还会记得:

    err = ch.QueueBind(
      q.Name, // queue name
      "",     // routing key
      "logs", // exchange
      false,
      nil)
    

    绑定是用来维系交换器和队列关系的,这可以被简单地理解为:队列仅仅对从交换器中传的消息感兴趣。

    绑定有个额外参数叫做routing_key,为了避免与Channel.Publish方法中的参数相混淆,我们称之为binding key(绑定键)。使用绑定键创建绑定如下:

    err = ch.QueueBind(
      q.Name,    // queue name
      "black",   // routing key
      "logs",    // exchange
      false,
      nil)
    

    绑定键的含义取决于交换器的类型。我们之前使用的fanout类型的交换器,就会直接忽略这个参数。

    Direct型交换器

    我们之前的教程中的日志系统是广播所有的消息到所有消费者。我们希望以此拓展来实现根据消息严重性来过滤消息。比如我们希望 写日志到硬盘的代码仅仅接收严重级别的,不要浪费磁盘存储在warning或者info级别的日志。

    之前使用的是fanout类型交换器,没有更好的拓展性或者说灵活性——它只能盲目的广播。

    现在 使用direct型交换器替代。Direct型的路由算法 比较简单——消息会被派发到某个队列,该队列的绑定键恰好和消息的路由键一致。

    为了阐述,考虑如下设置:

    image

    该设置中,可以看到direct型的交换器X被绑定到了两个队列:Q1、Q2。Q1使用绑定键orange绑定,Q2包含两个绑定键:black和green。

    基于如上设置的话,使用路由键orange发布的消息会被路由到Q1队列,而使用black或者green路由键的消息均会被路由到Q2,所有其余消息将被丢弃。

    备注:这里的交换器X和队列的绑定是多对多的关系,也就是说一个交换器可以到绑定多个队列,一个队列也可以被多个交换器绑定,消息只会被路由一次,不能因为两个绑定键都匹配上了路由键消息就会被路由两次,这种是不存在的。

    多个绑定

    image

    用相同的绑定键去绑定多个队列是完全合法的,我们可以再添加一个black绑定键来绑定X和Q1,这样Q1和Q2都使用black绑定到了交换器X,这其实和fanout类型的交换器直接绑定到队列Q1、Q2功能相同:使用black路由键的消息会被直接路由到Q1和Q2。

    发送日志

    我们将使用该模型来构建日志系统。使用direct型的交换器替换fanout型的,我们将日志的严重级别作为路由键,这样的话接收端程序可以选择日志接收级别进行接收,首先聚焦下日志发送端:

    首先创建一个交换器:

    err = ch.ExchangeDeclare(
      "logs_direct", // name
      "direct",      // type
      true,          // durable
      false,         // auto-deleted
      false,         // internal
      false,         // no-wait
      nil,           // arguments
    )
    

    然后是发送消息:

    err = ch.ExchangeDeclare(
      "logs_direct", // name
      "direct",      // type
      true,          // durable
      false,         // auto-deleted
      false,         // internal
      false,         // no-wait
      nil,           // arguments
    )
    failOnError(err, "Failed to declare an exchange")
    
    body := bodyFrom(os.Args)
    err = ch.Publish(
      "logs_direct",         // exchange
      severityFrom(os.Args), // routing key
      false, // mandatory
      false, // immediate
      amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte(body),
      })
    

    为了简单起见,我们假设日志严重级别如下:'info', 'warning', 'error'。

    订阅

    接收还和之前章节接收一样,只有一个例外:我们将为每一个感兴趣的严重级别创建一个绑定:

    q, err := ch.QueueDeclare(
      "",    // name
      false, // durable
      false, // delete when usused
      true,  // exclusive
      false, // no-wait
      nil,   // arguments
    )
    failOnError(err, "Failed to declare a queue")
    
    if len(os.Args) < 2 {
      log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
      os.Exit(0)
    }
    for _, s := range os.Args[1:] {
      log.Printf("Binding queue %s to exchange %s with routing key %s",
         q.Name, "logs_direct", s)
      err = ch.QueueBind(
        q.Name,        // queue name
        s,             // routing key
        "logs_direct", // exchange
        false,
        nil)
      failOnError(err, "Failed to bind a queue")
    }
    

    糅合在一起

    image

    发送端:

    // rabbitmq_4_emit_log_direct.go project main.go
    package main
    
    import (
    	"fmt"
    	"log"
    	"os"
    	"strings"
    
    	"github.com/streadway/amqp"
    )
    
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    		panic(fmt.Sprintf("%s: %s", msg, err))
    	}
    }
    
    func main() {
    	//链接队列服务
    	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    	failOnError(err, "Failed to connect to RabbitMQ")
    	defer conn.Close()
    
    	//声明一个channel
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open a channel")
    	defer ch.Close()
    
    	//声明一个direct类型交换器
    	err = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil)
    	failOnError(err, "Failed to declare an exchange")
    
    	body := bodyFrom(os.Args)
    	ch.Publish("logs_direct", severityFrom(os.Args), false, false, amqp.Publishing{
    		ContentType: "text/plain",
    		Body:        []byte(body),
    	})
    	failOnError(err, "Failed to publish a message")
    
    	log.Printf(" [x] Sent %s", body)
    
    }
    
    //接收消息发送内容
    func bodyFrom(args []string) string {
    	var s string
    	if (len(args) < 3) || os.Args[2] == "" {
    		s = "hello"
    	} else {
    		s = strings.Join(args[2:], " ")
    	}
    	return s
    }
    
    //接收日志级别,作为路由键使用
    func severityFrom(args []string) string {
    	var s string
    	if len(args) < 2 || args[1] == "" {
    		s = "info"
    	} else {
    		s = args[1]
    	}
    	return s
    }
    

    接收端:

    // rabbitmq_4_receive_logs_direct.go project main.go
    package main
    
    import (
    	"fmt"
    	"log"
    	"os"
    
    	"github.com/streadway/amqp"
    )
    
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    		panic(fmt.Sprintf("%s: %s", msg, err))
    	}
    }
    
    func main() {
    	//链接队列服务
    	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    	failOnError(err, "Failed to connect to RabbitMQ")
    	defer conn.Close()
    
    	//声明一个channel
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open a channel")
    	defer ch.Close()
    
    	//声明一个direct类型交换器
    	err = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil)
    	failOnError(err, "Failed to declare an exchange")
    
    	//声明一个队列
    	q, err := ch.QueueDeclare("", false, false, true, false, nil)
    	failOnError(err, "Failed to declare a queue")
    
    	//判断cmd窗口接收参数是否足够
    	if len(os.Args) < 2 {
    		log.Printf("Usage:%s [info] [warning] [error]", os.Args[0])
    		os.Exit(0)
    	}
    
    	//cmd窗口输入的多个日志级别,分别循环处理—进行绑定
    	for _, s := range os.Args[1:] {
    		log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s)
    		ch.QueueBind(q.Name, s, "logs_direct", false, nil)
    		failOnError(err, "Failed to bind a queue")
    	}
    
    	msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
    	failOnError(err, "Failed to register a consumer")
    
    	forever := make(chan bool)
    	go func() {
    		for d := range msgs {
    			log.Printf(" [x] %s", d.Body)
    		}
    	}()
    	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
    	<-forever
    }
    

    如果您只想保存“警告”和“错误”(而不是“信息”)日志消息到文件,只需要打开一个控制台然后输入:

    go run receive_logs_direct.go warning error > logs_from_rabbit.log

    如果你想看到所有的日志消息在你的屏幕上,打开一个新的终端,输入:

    go run receive_logs_direct.go info warning error

    发出一个错误日志消息类型如下:

    go run emit_log_direct.go error "Run. Run. Or it will explode."

    可以观察到:

    消息可以进行分类接收了, 只有error级别的消息才会被存入log日志文件,而info、warning级别的都不存入。

    实际效果如下:

    image

    image

    image

  • 相关阅读:
    Kafka日志段源码分析
    Kafka日志结构概览
    LDAP统一身份认证解读及实践
    Keycloak集成三方身份提供者的注销流程
    Keycloak会话管理-refreshToken
    Cas校验INVALID_TICKET-not recognized
    如何获取Docker容器的root权限
    OIDC-code to token
    Newrelic集成wildfly报NoClassDefFoundError
    Cookie深入详解
  • 原文地址:https://www.cnblogs.com/vipzhou/p/6070104.html
Copyright © 2011-2022 走看看