zoukankan      html  css  js  c++  java
  • RabbitMqBase消息队列小结

    RabbitMqBase消息队列小结

    虽然这个消息队列我只是知道应用场景在哪里,还没有实际操作到,但是原理还是要知道的。这些知识点就像珍珠,万一哪天就用到了。在没具体学习之前,我一直在想: 这玩意不就是个队列吗。还能玩出花来?结果,一研究,还真的打脸了。

    首先,这玩意用的是AMQP协议,并且只是占用了一个tcp连接,然后就会问到,多个消息,那不得卡死啊,不。他用了channel,这个东西可以有多个,可以理解为他开辟了线程,这样就避免了网络连接的资源浪费。

    消息流程

    消息 => 交换机 => 通过绑定的key => 队列 => 消费者

    交换机发布消息的方式有

    • 直接发送direct: 消息一根肠子到队列
    • 广播fanout: 交换机发布消息,无需指定key,绑定在交换机上面的队列就会知道消息
    • 匹配消息topic: 交换机发布消息根据规则匹配到满足规则的key上, 和正则匹配差不多一个意思

    延时队列

    对于我们想发布一个定时任务,我们不想让这个任务占用我们自己的电脑资源,就要发布完任务,然后就关电脑睡觉。这时候,我们就需要一个mq服务器,把消息塞进去,首先想的就是让消息自己根据时间戳排序,不好意思,想多了,很难实现。那就换一种思路,消息设置过期时间,然后就把消息发到别的转换机上,再去发到指定队列上,任务监听队列就好了。

    代码实现

    • 发布消息send.go

      package dead_q
      
      import (
      	"github.com/streadway/amqp"
      	"go-mq/conf"
      	"log"
      )
      
      func failOnError(err error, msg string) {
      	if err != nil {
      		log.Fatalf("%s: %s", msg, err)
      	}
      }
      
      func Send() {
      	url := conf.Host
      	conn, err := amqp.Dial(url)
      	failOnError(err, "Failed to connect to RabbitMQ")
      	defer conn.Close()
      
      	ch, err := conn.Channel()
      	failOnError(err, "Failed to open a channel")
      	defer ch.Close()
      
      	body := "dead-last"
      	// 将消息发送到延时队列上
      	err = ch.Publish(
      		"delay-exchange",                 // exchange 这里为空则不选择 exchange
      		"",         // routing key
      		true,              // mandatory
      		false,              // immediate
      		amqp.Publishing{
      			ContentType: "text/plain",
      			Body:        []byte(body),
      			Expiration: "5000",    // 设置五秒的过期时间
      		})
      	failOnError(err, "Failed to publish a message")
      
      	log.Printf(" [x] Sent %s", body)
      }
      
    • 第一个交换机为了设置超时时间和下一个交换机rec.go

      package rec
      
      import (
      	"github.com/streadway/amqp"
      	"go-mq/conf"
      	"log"
      )
      
      func failOnError(err error, msg string) {
      	if err != nil {
      		log.Fatalf("%s: %s", msg, err)
      	}
      }
      
      func Rec() {
      	url := conf.Host
      	conn, err := amqp.Dial(url)
      	failOnError(err, "Failed to connect to RabbitMQ")
      	defer conn.Close()
      
      	ch, err := conn.Channel()
      	failOnError(err, "Failed to open a channel")
      	defer ch.Close()
      
      	// 声明一个主要使用的 exchange
      	err = ch.ExchangeDeclare(
      		"delay-exchange",   // name
      		"fanout", // type
      		true,     // durable
      		false,    // auto-deleted
      		false,    // internal
      		false,    // no-wait
      		nil,      // arguments
      	)
      	failOnError(err, "Failed to declare an exchange")
      
      	/**
      	 * 注意,这里是重点!!!!!
      	 * 声明一个延时队列, ß我们的延时消息就是要发送到这里
      	 */
      	q, errDelay := ch.QueueDeclare(
      		"delay-qune",    // name
      		true, // durable
      		false, // delete when unused
      		false,  // exclusive
      		false, // no-wait
      		amqp.Table{
      			// 当消息过期时把消息发送到 logs 这个 exchange
      			"x-dead-letter-exchange":"dead-exchange",
      		},   // arguments
      	)
      	failOnError(errDelay, "Failed to declare a delay_queue")
      
      	err = ch.QueueBind(
      		q.Name, // queue name, 这里指的是 test_logs
      		"",     // routing key
      		"delay-exchange", // exchange
      		false,
      		nil)
      	failOnError(err, "Failed to bind a queue")
      
      	// 这里监听的是 test_logs
      	msgs, err := ch.Consume(
      		q.Name, // queue name, 这里指的是 test_logs
      		"",     // consumer
      		false,   // auto-ack
      		false,  // exclusive
      		false,  // no-local
      		false,  // no-wait
      		nil,    // args
      	)
      	failOnError(err, "Failed to register a consumer")
      
      	forever := make(chan bool)
      
      	go func() {
      		for d := range msgs {
      			log.Printf(" [x] %s", d.Body)
      			d.Ack(false)
      		}
      	}()
      
      	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
      	<-forever
      }
      
    • 最后的队列用于任务消费dedrec.go

      package dead_rec
      
      import (
      	"github.com/streadway/amqp"
      	"go-mq/conf"
      	"log"
      )
      
      func failOnError(err error, msg string) {
      	if err != nil {
      		log.Fatalf("%s: %s", msg, err)
      	}
      }
      
      func Rec() {
      	url := conf.Host
      	conn, err := amqp.Dial(url)
      	failOnError(err, "Failed to connect to RabbitMQ")
      	defer conn.Close()
      
      	ch, err := conn.Channel()
      	failOnError(err, "Failed to open a channel")
      	defer ch.Close()
      
      	// 声明一个主要使用的 exchange
      	err = ch.ExchangeDeclare(
      		"dead-exchange",   // name
      		amqp.ExchangeFanout, // type
      		true,     // durable
      		false,    // auto-deleted
      		false,    // internal
      		false,    // no-wait
      		nil,      // arguments
      	)
      	failOnError(err, "Failed to declare an exchange")
      
      	/**
      	 * 注意,这里是重点!!!!!
      	 * 声明一个延时队列, ß我们的延时消息就是要发送到这里
      	 */
      	arg2 := make(map[string]interface{})
      	arg2["x-dead-letter-exchange"] = "dead-q"
      	arg2["x-max-length"] = 3
      	delayQune, _ := ch.QueueDeclare(
      		"dead-qune",    // name
      		true, // durable
      		false, // delete when unused
      		false,  // exclusive
      		false, // no-wait
      		 arg2,   // arguments
      	)
      
      	err = ch.QueueBind(
      		delayQune.Name, // queue name, 这里指的是 test_logs
      		"",     // routing key
      		"dead-exchange", // exchange
      		false,
      		nil)
      	failOnError(err, "Failed to bind a queue")
      
      	// 这里监听的是 test_logs
      	msgs, err := ch.Consume(
      		delayQune.Name, // queue name, 这里指的是 test_logs
      		"",     // consumer
      		false,   // auto-ack
      		false,  // exclusive
      		false,  // no-local
      		false,  // no-wait
      		nil,    // args
      	)
      	failOnError(err, "Failed to register a consumer")
      
      	forever := make(chan bool)
      
      	go func() {
      		for d := range msgs {
      			log.Printf(" [x] %s", d.Body)
      			d.Ack(false)
      		}
      	}()
      
      	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
      	<-forever
      }
      

    附加知识点

    • 消费者公平消费

      if err := ch.Qos(1, 0, false); err != nil {
      	failOnError(err, "Failed to set QoS")
      }
      
    • 设置队列属性

      arg2 := make(map[string]interface{})
      arg2["x-dead-letter-exchange"] = "dead-q"
      arg2["x-max-length"] = 3
      delayQune, _ := ch.QueueDeclare(
          "dead-qune",    // name
          true, // durable
          false, // delete when unused
          false,  // exclusive
          false, // no-wait
          arg2,   // arguments
      )
      
  • 相关阅读:
    (转)130道ASP.NET面试题
    (转)c#对象内存模型
    (转)探讨:ASP.NET技术的学习顺序问题
    (转)ASP.NET缓存概念及其应用浅析
    Response.Redirect在新窗口打开网页
    转载 C# 序列化与反序列化意义详解
    简单进制转化
    简单成绩管理系统(没有存盘)
    kali不能ifconfig等简单命令
    蓝桥杯练习之开花(二分法查找)
  • 原文地址:https://www.cnblogs.com/maomaomaoge/p/14129435.html
Copyright © 2011-2022 走看看