RabbitMQ官方教程二 Work Queues(GOLANG语言实现)
在第一个教程中,我们编写了程序来发送和接收来自命名队列的消息。 在这一部分中,我们将创建一个工作队列,该队列将用于在多个worker之间分配耗时的任务。
工作队列(又称任务队列)的主要思路是避免立即执行资源密集型任务(比如耗时较长的邮件发送、文件处理等),而不得不等待它完成。 相反,我们安排任务在以后完成(异步完成)。 我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。 当您运行许多worker时,他们将共享任务。
这个概念在Web应用程序中特别有用,因为在Web应用程序中,不可能在较短的HTTP请求时间内处理复杂的任务。
准备
在本教程的前一部分,我们发送了一条包含“ Hello World!”的消息。 现在,我们将发送代表复杂任务的字符串。 我们没有真实的任务,例如要调整大小的图像或要渲染的pdf文件,因此我们假装耗时任务-使用time.Sleep函数来伪造它。 我们将字符串中的点数作为它的复杂度。 每个点将占“工作”的一秒。 例如,Hello ...描述的虚拟任务将花费三秒钟。
我们将稍微修改上一个示例中的send.go代码,以允许从命令行发送任意消息。 该程序会将任务安排到我们的工作队列中,因此我们将其命名为new_task.go:
#new_task.go
package main
import (
"github.com/streadway/amqp"
"log"
"os"
"strings"
)
func main(){
// 连接RabbitMQ服务器
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建一个channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // 队列名称
false, // 是否持久化
false, // 是否自动删除
false, // 是否独立
false,nil,
)
failOnError(err, "Failed to declare a queue")
body := bodyForm(os.Args)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
DeliveryMode:amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyForm(args []string) string{
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
// 帮助函数检测每一个amqp调用
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
#worker.go
package main
import (
"bytes"
"github.com/streadway/amqp"
"log"
"time"
)
func main(){
// 连接RabbitMQ服务器
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建一个channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 监听队列
q, err := ch.QueueDeclare(
"hello", // 队列名称
false, // 是否持久化
false, // 是否自动删除
false, // 是否独立
false,nil,
)
failOnError(err, "Failed to declare a queue")
// 消费队列
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
// 统计string中的`.`来表示执行时间
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
// 帮助函数检测每一个amqp调用
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
开启两个worker准备接受队列中数据
# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
执行new_task.go进行发送数据
# shell 3
go run new_task.go First message.
go run new_task.go Second message..
go run new_task.go Third message...
go run new_task.go Fourth message....
go run new_task.go Fifth message.....
得到结果如下
默认情况下,RabbitMQ将每个消息依次发送给下一个消费者。 平均而言,每个消费者都会收到相同数量的消息。 这种分发消息的方式称为循环。
消息确认
执行任务可能需要几秒钟。 您可能想知道,如果其中一个消费者开始一项漫长的任务而仅部分完成而死掉,会发生什么情况。 使用我们当前的代码,RabbitMQ一旦向消费者发送了一条消息,便立即将其标记为删除。 在这种情况下,如果您杀死一个worker,我们将丢失正在处理的消息。 我们还将丢失所有发送给该特定工作人员但尚未处理的消息。
但是我们不想丢失任何任务。 如果一个worker死亡,我们希望将任务交付给另一个worker。
为了确保消息永不丢失,RabbitMQ支持消息确认。 消费者发送回一个消息确认,告知RabbitMQ特定的消息已被接收,处理,并且RabbitMQ可以自由删除它。
如果使用者宕机(其通道已关闭,连接已关闭或TCP连接丢失)而没有发送确认,RabbitMQ将了解消息未完全处理,并将重新排队。 如果同时有其他消费者在线,它将很快将其重新分发给另一个消费者。 这样,您可以确保即使worker偶尔宕机也不会丢失任何消息。
RabbitMQ没有任何消息超时设置; 消费者死亡时,RabbitMQ将重新传递消息。 即使处理一条消息花费非常非常长的时间也没关系。
在本教程中,我们将通过为“ auto-ack”参数传递一个false来使用手动消息确认,然后一旦在任务完成后使用d.Ack(false)从worker发送适当的确认。
#worker.go修改部门代码
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // 是否自动消息确认
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
使用此代码,我们可以确保,即使您在处理消息时使用CTRL + C杀死worker,也不会丢失任何信息。 worker后不久,所有未确认的消息将重新发送。
消息确认必须与消息发送在同一channel上。 尝试使用其他channel进行确认将导致通道级协议异常。
忘记消息确认
忘记设置消息确认是一个常见的错误。 这是一个很容易犯的错误,但是后果很严重。 当您的客户端退出时,消息将被重新发送(可能看起来像是随机重新发送),但是RabbitMQ将占用越来越多的内存,因为它将无法释放任何未确认的消息。
为了调试这种错误,您可以使用rabbitmqctl打印messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
消息持久化
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。 但是,如果RabbitMQ服务器停止,我们的任务仍然会丢失。
RabbitMQ退出或宕机时,它将丢失队列和消息,除非您告知不要这样做。 要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久。
首先,我们需要确保RabbitMQ永远不会丢失我们的队列。 为此,我们需要将其声明为持久的:
q, err := ch.QueueDeclare(
"hello", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
尽管此命令本身是正确的,但在我们当前的设置中将无法使用。 这是因为我们已经定义了一个名为hello的队列,该队列并不持久。 RabbitMQ不允许您使用不同的参数重新定义现有队列,并且将向尝试执行此操作的任何程序返回错误。 但是有一个快速的解决方法-让我们声明一个名称不同的队列,例如task_queue:
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
这种持久的选项更改需要同时应用于生产者代码和消费者代码。
在这一点上,我们确保即使RabbitMQ重新启动,task_queue队列也不会丢失。 现在,我们需要使用amqp.Persistent选项amqp.Publishing来将消息标记为持久消息。
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing {
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
关于消息持久性的说明
将消息标记为持久性并不能完全保证不会丢失消息。 尽管它告诉RabbitMQ将消息保存到磁盘,但是RabbitMQ接受消息并且尚未保存消息时,还有很短的时间。 而且,RabbitMQ不会对每条消息都执行fsync(2)-它可能只是保存到缓存中,而没有真正写入磁盘。 持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。 如果您需要更强有力的保证,则可以使用publisher confirms。
公平分发
您可能已经注意到,调度仍然无法完全按照我们的要求进行。 例如,在有两名worker的情况下,当所有奇数的消息都很重,偶数消息很轻时,一个worker将一直忙碌而另一位worker将几乎不做任何工作。 但是,RabbitMQ对此一无所知,并且仍将平均分配消息。
发生这种情况是因为RabbitMQ在消息进入队列时才调度消息。 它不会查看使用者的未确认消息数。 它只是盲目地将每第n条消息发送给第n个使用者。
为了解决这个问题,我们可以将预取计数的值设置为1。这告诉RabbitMQ一次不要给一个worker发送多条消息。 换句话说,在处理并确认上一条消息之前,不要将新消息发送给worker。 而是将其分派给不忙的下一个worker。
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
关于队列大小的注意事项
如果所有工作人员都忙,您的队列就满了。 您将需要留意这一点,也许会增加更多的工作人员,或者有其他一些策略。
最终代码
new_task.go
package main
import (
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body := bodyFrom(os.Args)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
worker.go
package main
import (
"bytes"
"github.com/streadway/amqp"
"log"
"time"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}