zoukankan      html  css  js  c++  java
  • RabbitMQ官方教程二 Work Queues(GOLANG语言实现)

    RabbitMQ官方教程二 Work Queues(GOLANG语言实现)

    架构图

    在第一个教程中,我们编写了程序来发送和接收来自命名队列的消息。 在这一部分中,我们将创建一个工作队列,该队列将用于在多个worker之间分配耗时的任务。

    工作队列(又称任务队列)的主要思路是避免立即执行资源密集型任务(比如耗时较长的邮件发送、文件处理等),而不得不等待它完成。 相反,我们安排任务在以后完成(异步完成)。 我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。 当您运行许多worker时,他们将共享任务。

    这个概念在Web应用程序中特别有用,因为在Web应用程序中,不可能在较短的HTTP请求时间内处理复杂的任务。

    准备

    在本教程的前一部分,我们发送了一条包含“ Hello World!”的消息。 现在,我们将发送代表复杂任务的字符串。 我们没有真实的任务,例如要调整大小的图像或要渲染的pdf文件,因此我们假装耗时任务-使用time.Sleep函数来伪造它。 我们将字符串中的点数作为它的复杂度。 每个点将占“工作”的一秒。 例如,Hello ...描述的虚拟任务将花费三秒钟。

    我们将稍微修改上一个示例中的send.go代码,以允许从命令行发送任意消息。 该程序会将任务安排到我们的工作队列中,因此我们将其命名为new_task.go:

    #new_task.go
    
    package main
    
    import (
    	"github.com/streadway/amqp"
    	"log"
    	"os"
    	"strings"
    )
    
    func main(){
    	// 连接RabbitMQ服务器
    	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    	failOnError(err, "Failed to connect to RabbitMQ")
    	defer conn.Close()
    	// 创建一个channel
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open a channel")
    	defer ch.Close()
    
    	q, err  := ch.QueueDeclare(
    		"hello",			// 队列名称
    		false,			// 是否持久化
    		false,		// 是否自动删除
    		false,			// 是否独立
    		false,nil,
    		)
    	failOnError(err, "Failed to declare a queue")
    
    	body := bodyForm(os.Args)
    	err = ch.Publish(
    		"",     // exchange
    		q.Name, // routing key
    		false,  // mandatory
    		false,  // immediate
    		amqp.Publishing {
    			DeliveryMode:amqp.Persistent,
    			ContentType: "text/plain",
    			Body:        []byte(body),
    		})
    	failOnError(err, "Failed to publish a message")
    	log.Printf(" [x] Sent %s", body)
    }
    
    func bodyForm(args []string) string{
    	var s string
    	if (len(args) < 2) || os.Args[1] == "" {
    		s = "hello"
    	} else {
    		s = strings.Join(args[1:], " ")
    	}
    	return s
    }
    
    // 帮助函数检测每一个amqp调用
    func failOnError(err error, msg string)  {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
    
    
    
    #worker.go
    
    package main
    
    import (
    	"bytes"
    	"github.com/streadway/amqp"
    	"log"
    	"time"
    )
    
    func main(){
    	// 连接RabbitMQ服务器
    	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    	failOnError(err, "Failed to connect to RabbitMQ")
    	defer conn.Close()
    	// 创建一个channel
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open a channel")
    	defer ch.Close()
    	// 监听队列
    	q, err  := ch.QueueDeclare(
    		"hello",			// 队列名称
    		false,			// 是否持久化
    		false,		// 是否自动删除
    		false,			// 是否独立
    		false,nil,
    	)
    	failOnError(err, "Failed to declare a queue")
    	// 消费队列
    	msgs, err := ch.Consume(
    		q.Name, // queue
    		"",     // consumer
    		true,   // auto-ack
    		false,  // exclusive
    		false,  // no-local
    		false,  // no-wait
    		nil,    // args
    	)
    	failOnError(err, "Failed to register a consumer")
    
    	forever := make(chan bool)
    
    	go func() {
    		for d := range msgs {
    			log.Printf("Received a message: %s", d.Body)
    			// 统计string中的`.`来表示执行时间
    			dot_count := bytes.Count(d.Body, []byte("."))
    			t := time.Duration(dot_count)
    			time.Sleep(t * time.Second)
    			log.Printf("Done")
    		}
    	}()
    
    	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    	<-forever
    }
    
    // 帮助函数检测每一个amqp调用
    func failOnError(err error, msg string)  {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
    

    开启两个worker准备接受队列中数据

    # shell 1
    go run worker.go
    # => [*] Waiting for messages. To exit press CTRL+C
    
    
    # shell 2
    go run worker.go
    # => [*] Waiting for messages. To exit press CTRL+C
    

    执行new_task.go进行发送数据

    
    # shell 3
    go run new_task.go First message.
    go run new_task.go Second message..
    go run new_task.go Third message...
    go run new_task.go Fourth message....
    go run new_task.go Fifth message.....
    
    

    得到结果如下
    结果图

    默认情况下,RabbitMQ将每个消息依次发送给下一个消费者。 平均而言,每个消费者都会收到相同数量的消息。 这种分发消息的方式称为循环。

    消息确认

    执行任务可能需要几秒钟。 您可能想知道,如果其中一个消费者开始一项漫长的任务而仅部分完成而死掉,会发生什么情况。 使用我们当前的代码,RabbitMQ一旦向消费者发送了一条消息,便立即将其标记为删除。 在这种情况下,如果您杀死一个worker,我们将丢失正在处理的消息。 我们还将丢失所有发送给该特定工作人员但尚未处理的消息。

    但是我们不想丢失任何任务。 如果一个worker死亡,我们希望将任务交付给另一个worker。

    为了确保消息永不丢失,RabbitMQ支持消息确认。 消费者发送回一个消息确认,告知RabbitMQ特定的消息已被接收,处理,并且RabbitMQ可以自由删除它。

    如果使用者宕机(其通道已关闭,连接已关闭或TCP连接丢失)而没有发送确认,RabbitMQ将了解消息未完全处理,并将重新排队。 如果同时有其他消费者在线,它将很快将其重新分发给另一个消费者。 这样,您可以确保即使worker偶尔宕机也不会丢失任何消息。

    RabbitMQ没有任何消息超时设置; 消费者死亡时,RabbitMQ将重新传递消息。 即使处理一条消息花费非常非常长的时间也没关系。

    在本教程中,我们将通过为“ auto-ack”参数传递一个false来使用手动消息确认,然后一旦在任务完成后使用d.Ack(false)从worker发送适当的确认。

    #worker.go修改部门代码
    msgs, err := ch.Consume(
      q.Name, // queue
      "",     // consumer
      false,  // 是否自动消息确认
      false,  // exclusive
      false,  // no-local
      false,  // no-wait
      nil,    // args
    )
    failOnError(err, "Failed to register a consumer")
    
    forever := make(chan bool)
    
    go func() {
      for d := range msgs {
        log.Printf("Received a message: %s", d.Body)
        dot_count := bytes.Count(d.Body, []byte("."))
        t := time.Duration(dot_count)
        time.Sleep(t * time.Second)
        log.Printf("Done")
        d.Ack(false)
      }
    }()
    
    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
    
    

    使用此代码,我们可以确保,即使您在处理消息时使用CTRL + C杀死worker,也不会丢失任何信息。 worker后不久,所有未确认的消息将重新发送。

    消息确认必须与消息发送在同一channel上。 尝试使用其他channel进行确认将导致通道级协议异常。

    忘记消息确认

    忘记设置消息确认是一个常见的错误。 这是一个很容易犯的错误,但是后果很严重。 当您的客户端退出时,消息将被重新发送(可能看起来像是随机重新发送),但是RabbitMQ将占用越来越多的内存,因为它将无法释放任何未确认的消息。

    为了调试这种错误,您可以使用rabbitmqctl打印messages_unacknowledged字段:

    sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
    

    消息持久化

    我们已经学会了如何确保即使消费者死亡,任务也不会丢失。 但是,如果RabbitMQ服务器停止,我们的任务仍然会丢失。

    RabbitMQ退出或宕机时,它将丢失队列和消息,除非您告知不要这样做。 要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久。

    首先,我们需要确保RabbitMQ永远不会丢失我们的队列。 为此,我们需要将其声明为持久的:

    q, err := ch.QueueDeclare(
      "hello",      // name
      true,         // durable
      false,        // delete when unused
      false,        // exclusive
      false,        // no-wait
      nil,          // arguments
    )
    failOnError(err, "Failed to declare a queue")
    
    

    尽管此命令本身是正确的,但在我们当前的设置中将无法使用。 这是因为我们已经定义了一个名为hello的队列,该队列并不持久。 RabbitMQ不允许您使用不同的参数重新定义现有队列,并且将向尝试执行此操作的任何程序返回错误。 但是有一个快速的解决方法-让我们声明一个名称不同的队列,例如task_queue:

    q, err := ch.QueueDeclare(
      "task_queue", // name
      true,         // durable
      false,        // delete when unused
      false,        // exclusive
      false,        // no-wait
      nil,          // arguments
    )
    failOnError(err, "Failed to declare a queue")
    

    这种持久的选项更改需要同时应用于生产者代码和消费者代码。

    在这一点上,我们确保即使RabbitMQ重新启动,task_queue队列也不会丢失。 现在,我们需要使用amqp.Persistent选项amqp.Publishing来将消息标记为持久消息。

    err = ch.Publish(
      "",           // exchange
      q.Name,       // routing key
      false,        // mandatory
      false,
      amqp.Publishing {
        DeliveryMode: amqp.Persistent,
        ContentType:  "text/plain",
        Body:         []byte(body),
    })
    

    关于消息持久性的说明

    将消息标记为持久性并不能完全保证不会丢失消息。 尽管它告诉RabbitMQ将消息保存到磁盘,但是RabbitMQ接受消息并且尚未保存消息时,还有很短的时间。 而且,RabbitMQ不会对每条消息都执行fsync(2)-它可能只是保存到缓存中,而没有真正写入磁盘。 持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。 如果您需要更强有力的保证,则可以使用publisher confirms。

    公平分发

    您可能已经注意到,调度仍然无法完全按照我们的要求进行。 例如,在有两名worker的情况下,当所有奇数的消息都很重,偶数消息很轻时,一个worker将一直忙碌而另一位worker将几乎不做任何工作。 但是,RabbitMQ对此一无所知,并且仍将平均分配消息。

    发生这种情况是因为RabbitMQ在消息进入队列时才调度消息。 它不会查看使用者的未确认消息数。 它只是盲目地将每第n条消息发送给第n个使用者。

    结果图

    为了解决这个问题,我们可以将预取计数的值设置为1。这告诉RabbitMQ一次不要给一个worker发送多条消息。 换句话说,在处理并确认上一条消息之前,不要将新消息发送给worker。 而是将其分派给不忙的下一个worker。

    err = ch.Qos(
      1,     // prefetch count
      0,     // prefetch size
      false, // global
    )
    failOnError(err, "Failed to set QoS")
    

    关于队列大小的注意事项

    如果所有工作人员都忙,您的队列就满了。 您将需要留意这一点,也许会增加更多的工作人员,或者有其他一些策略。

    最终代码

    new_task.go

    package main
    
    import (
            "log"
            "os"
            "strings"
    
            "github.com/streadway/amqp"
    )
    
    func failOnError(err error, msg string) {
            if err != nil {
                    log.Fatalf("%s: %s", msg, err)
            }
    }
    
    func main() {
            conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
            failOnError(err, "Failed to connect to RabbitMQ")
            defer conn.Close()
    
            ch, err := conn.Channel()
            failOnError(err, "Failed to open a channel")
            defer ch.Close()
    
            q, err := ch.QueueDeclare(
                    "task_queue", // name
                    true,         // durable
                    false,        // delete when unused
                    false,        // exclusive
                    false,        // no-wait
                    nil,          // arguments
            )
            failOnError(err, "Failed to declare a queue")
    
            body := bodyFrom(os.Args)
            err = ch.Publish(
                    "",           // exchange
                    q.Name,       // routing key
                    false,        // mandatory
                    false,
                    amqp.Publishing{
                            DeliveryMode: amqp.Persistent,
                            ContentType:  "text/plain",
                            Body:         []byte(body),
                    })
            failOnError(err, "Failed to publish a message")
            log.Printf(" [x] Sent %s", body)
    }
    
    func bodyFrom(args []string) string {
            var s string
            if (len(args) < 2) || os.Args[1] == "" {
                    s = "hello"
            } else {
                    s = strings.Join(args[1:], " ")
            }
            return s
    }
    

    worker.go

    
    package main
    
    import (
            "bytes"
            "github.com/streadway/amqp"
            "log"
            "time"
    )
    
    func failOnError(err error, msg string) {
            if err != nil {
                    log.Fatalf("%s: %s", msg, err)
            }
    }
    
    func main() {
            conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
            failOnError(err, "Failed to connect to RabbitMQ")
            defer conn.Close()
    
            ch, err := conn.Channel()
            failOnError(err, "Failed to open a channel")
            defer ch.Close()
    
            q, err := ch.QueueDeclare(
                    "task_queue", // name
                    true,         // durable
                    false,        // delete when unused
                    false,        // exclusive
                    false,        // no-wait
                    nil,          // arguments
            )
            failOnError(err, "Failed to declare a queue")
    
            err = ch.Qos(
                    1,     // prefetch count
                    0,     // prefetch size
                    false, // global
            )
            failOnError(err, "Failed to set QoS")
    
            msgs, err := ch.Consume(
                    q.Name, // queue
                    "",     // consumer
                    false,  // auto-ack
                    false,  // exclusive
                    false,  // no-local
                    false,  // no-wait
                    nil,    // args
            )
            failOnError(err, "Failed to register a consumer")
    
            forever := make(chan bool)
    
            go func() {
                    for d := range msgs {
                            log.Printf("Received a message: %s", d.Body)
                            dot_count := bytes.Count(d.Body, []byte("."))
                            t := time.Duration(dot_count)
                            time.Sleep(t * time.Second)
                            log.Printf("Done")
                            d.Ack(false)
                    }
            }()
    
            log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
            <-forever
    }
    
  • 相关阅读:
    SpringBoot08-缓存
    Spring注解-自动装配(三)
    Spring注解-生命周期与属性赋值(二)
    Spring注解-组件注册(一)
    剖析SpringMVC流程与整合(八)
    SpringMVC视图解析与配置(六)
    SpringMVC的其他功能(七)
    简单了解SpringMVC(五)
    Java动态代理
    Spring事务(四)
  • 原文地址:https://www.cnblogs.com/zhouqi666/p/12044315.html
Copyright © 2011-2022 走看看