前言:如果你对rabbitmq基本概念都不懂,可以移步此篇博文查阅消息队列RabbitMQ
一、单发单收
在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ代表使用者保留的消息缓冲区。
单发单收模式下:一发一收
发送端只需要创建队列,然后向队列发送消息。
接收端也需要创建队列,因为如果接收端先启动,没有此队列就会报错,虽然发送端和接收端都创建此队列,但rabbitmq还是很智能的,它只会创建一次。
需要注意的地方:
1.发送端和接收端都需要创建同名队列
2.接收端指定从这个同名队列中接收消息
发送端
package main import ( "RabbitMQ" "strconv" "strings" "time" ) func main(){ //第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字 send_mq := rabbitMQ.New("链接","hello") i := 0 for{ time.Sleep(time.Second*5) greetings := []string{"Helloworld!",strconv.Itoa(i)} send_mq.Send("hello",strings.Join( greetings, " ")) i = i+1 } }
接收端
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ //第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello") for{ //接收消息时,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) } }() } }
二、工作队列Work Queue
工作队列和单发单收模式比起来,接收端可以有多个,接收端多了以后就会出现数据分配问题,发过来的数据到底该被哪个接收端接收,所以有两种模式:
公平分发:每个接收端接收消息的概率是相等的,发送端会循环依次给每个接收端发送消息,图一是公平分发,公平分发是rabbitmq默认模式。
公平派遣:保证接收端在处理完某个任务,并发送确认信息后,RabbitMQ才会向它推送新的消息,在此之间若是有新的消息话,将会被推送到其它接收端,若所有的接收端都在处理任务,那么就会等待,图二为公平派遣。
注意:使用公平派遣模式时,消费者设置atuoack为false,需要手动回复ack。
关闭自动应答是为了消费者逻辑处理结束前不接受下一条消息,这样哪个消费者逻辑处理的快,接收的消息自然就多,从而实现公平分发。
图一:
图二:
公平分发模式下的发送端和接收端
发送端
package main import ( "RabbitMQ" "strconv" "strings" "time" ) func main(){ //第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字 send_mq := rabbitMQ.New("链接","hello") i := 0 for{ time.Sleep(time.Second*5) greetings := []string{"Helloworld!",strconv.Itoa(i)} send_mq.Send("hello",strings.Join( greetings, " ")) i = i+1 } }
接收端1
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ //第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello") for{ //接收消息时,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("recevie 1 Received a message: %s", d.Body) } }() } }
接收端2
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ //第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello") for{ //接收消息时,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("recevie 1 Received a message: %s", d.Body) } }() } }
公平派遣模式下的发送端和接收端
公平派遣模式下发送端与公平分发相同,接收端只需要加一端配置代码
我们可以将预取计数设置为1。这告诉RabbitMQ一次不要给工人一个以上的消息。换句话说,在处理并确认上一条消息之前,不要将新消息发送给工作人员。而是将其分派给不忙的下一个工作程序。
//配置队列参数 func (q *RabbitMQ)Qos(){ e := q.channel.Qos(1,0,false) failOnError(e,"无法设置QoS") }
接收端
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ //第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello") //配置公平派遣 receive_mq.Qos() for{ //接收消息时,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("recevie 2 Received a message: %s", d.Body) } }() } }
官方在这里介绍了出现以下两种问题的解决办法:
1.当接收者挂掉的时候,我们将丢失发送给接收端还没有处理的消息。
2.当rabbitmq服务器挂了,我们怎么保证我们的消息不丢失。
具体参考:https://www.rabbitmq.com/tutorials/tutorial-two-go.html
三、发布/订阅 Publish/Subscribe
发布订阅模式下多了一个概念:exchange,如何理解这个exchange,exchange的作用就是类似路由器,发送端发送消息需要带有routing key 就是路由键,服务器会根据路由键将消息从交换器路由到队列上去,所以发送端和接收端之间有了中介。
exchange有多个种类:direct,fanout,topic,header(非路由键匹配,功能和direct类似,很少用)。
首先介绍exchange下的fanout exchange,它会将发到这个exchange的消息广播到关注此exchange的所有接收端上。
广播模式下(1:N):
发送端连接到rabbitmq后,创建exchange,需要指定交换机的名字和类型,fanout为广播,然后向此exchange发送消息,其它就不用管了。
接收端的执行流程在程序备注中。
注意:广播模式下的exchange是发送端是不需要带路由键的哦。
package main import ( "RabbitMQ" "strconv" "strings" "time" ) func main(){ ch := rabbitMQ.Connect("amqp://user:password@ip:port/") rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange1","fanout") i := 0 for{ time.Sleep(1) greetings := []string{"Helloworld!",strconv.Itoa(i)} ch.Publish("exchange1",strings.Join( greetings, " "),"") i = i+1 } }
接收端1
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ // 1.接收者,首先创建自己队列 // 2.创建交换机 // 3.将自己绑定到交换机上 // 4.接收交换机上发过来的消息 //第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字 //1 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello1") //2 //第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型 rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange1","fanout") //3 // 队列绑定到exchange receive_mq.Bind("exchange1","") //4 for{ //接收消息时,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("recevie1 Received a message: %s", d.Body) } }() } }
接收端2
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ // 1.接收者,首先创建自己队列 // 2.创建交换机 // 3.将自己绑定到交换机上 // 4.接收交换机上发过来的消息 //第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字 //1 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2") //2 //第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型 rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange1","fanout") //3 // 队列绑定到exchange receive_mq.Bind("exchange1","") //4 for{ //接收消息时,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("recevie2 Received a message: %s", d.Body) } }() } }
四、路由Routing
路由模式其实就是全值匹配模式(direct),发送端发送消息需要带有路由键,就是下面发送端程序的routing key1,是一个字符串,发送端发给exchange,路由模式下的exchange会匹配这个路由键,如下面这个图,发送者发送时带有orange此路由键时,这条消息只会被转发给Q1队列,如果路由键没有匹配上的怎么办?,全值匹配,没有匹配到,那么所有接收者都接收不到消息,消息只会发送给匹配的队列,接收端的路由键是绑定exchange的时候用的。
注意:接收队列可以绑定多个路由键到exchange上,比如下面,当发送路由键为black,green,会被Q2接收。
发送端
package main import ( "RabbitMQ" "strconv" "strings" "time" ) func main(){ ch := rabbitMQ.Connect("amqp://user:password@ip:port/") rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","direct") i := 0 for{ time.Sleep(1) greetings := []string{"Helloworld!",strconv.Itoa(i)} if i%2 ==1 { //如果是奇数 ch.Publish("exchange",strings.Join( greetings, " "),"routing key1") } else{ ch.Publish("exchange",strings.Join( greetings, " "),"routing key2") } i = i+1 } }
接收端1
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ // 1.接收者,首先自己队列 // 2.创建交换机 // 3.将自己绑定到交换机上 // 4.接收交换机上发过来的消息 //第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字 //1 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2") //2 //第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型 rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","direct") //3 receive_mq.Bind("exchange","routing key1") //4 for{ //接收消息时,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("recevie1 Received a message: %s", d.Body) } }() } }
接收端2
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ // 1.接收者,首先自己队列 // 2.创建交换机 // 3.将自己绑定到交换机上 // 4.接收交换机上发过来的消息 //第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字 //1 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2") //2 //第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型 rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","direct") //3 receive_mq.Bind("exchange","routing key2") //4 for{ //接收消息时,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("recevie2 Received a message: %s", d.Body) } }() } }
五、Topic类型的exchange
前面的direct是全值匹配,那么topic就可以部分匹配,又可以全值匹配,比direct更加灵活。
消息发送到topic类型的exchange上时不能随意指定routing_key(一定是指由一系列由点号连接单词的字符串,单词可以是任意的,但一般都会与消息或多或少的有些关联)。Routing key的长度不能超过255个字节。
Binding key也一定要是同样的方式。Topic类型的exchange就像一个直接的交换:一个由生产者指定了确定routing key的消息将会被推送给所有Binding key能与之匹配的消费者。然而这种绑定有两种特殊的情况:
- *(星号):可以(只能)匹配一个单词
- #(井号):可以匹配多个单词(或者零个)
下边来举个例子:
在这个例子中,我们将会发送一些描述动物的消息。Routing key的第一个单词是描述速度的,第二个单词是描述颜色的,第三个是描述物种的:“<speed>.<colour>.<species>”。
这里我们创建三个Binding:Binding key为”*.orange.*”的Q1,和binding key为”*.*.rabbit”和”lazy.#”的Q2。
这些binding可以总结为:
- Q1对所有橘色的(orange)的动物感兴趣;
- Q2希望能拿到所有兔子的(rabbit)信息,还有比较懒惰的(lazy.#)动物信息。
一条以” quick.orange.rabbit”为routing key的消息将会推送到Q1和Q2两个queue上,routing key为“lazy.orange.elephant”的消息同样会被推送到Q1和Q2上。但如果routing key为”quick.orange.fox”的话,消息只会被推送到Q1上;routing key为”lazy.brown.fox”的消息会被推送到Q2上,routing key为"lazy.pink.rabbit”的消息也会被推送到Q2上,但同一条消息只会被推送到Q2上一次。
如果在发送消息时所指定的exchange和routing key在消费者端没有对应的exchange和binding key与之绑定的话,那么这条消息将会被丢弃掉。例如:"orange"和"quick.orange.male.rabbit"。但是routing为”lazy.orange.male.rabbit”的消息,将会被推到Q2上。
Topic类型的exchange:
Topic类型的exchange是很强大的,也可以实现其它类型的exchange。
- 当一个队列被绑定为binding key为”#”时,它将会接收所有的消息,此时和fanout类型的exchange很像。
- 当binding key不包含”*”和”#”时,这时候就很像direct类型的exchange。
发送端
package main import ( "RabbitMQ" "time" ) func main(){ ch := rabbitMQ.Connect("amqp://user:password@ip/") rabbitMQ.NewExchange("amqp://user:password@ip/","exchange","topic") for{ time.Sleep(1) ch.Publish("exchange","hello world","lazy.brown.fox") } }
接收端
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ // 1.接收者,首先自己队列 // 2.创建交换机 // 3.将自己绑定到交换机上 // 4.接收交换机上发过来的消息 //第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字 //1 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello1") //2 //第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型 rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","topic") //3 receive_mq.Bind("exchange","*.orange.*") //4 for{ //接收消息时,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("recevie1 Received a message: %s", d.Body) } }() } }
接收端2
package main import ( rabbitMQ "RabbitMQ" "log" ) func main(){ // 1.接收者,首先自己队列 // 2.创建交换机 // 3.将自己绑定到交换机上 // 4.接收交换机上发过来的消息 //第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字 //1 receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2") //2 //第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型 rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","topic") //3 receive_mq.Bind("exchange","*.*.rabbit") receive_mq.Bind("exchange","lazy.#") //4 for{ //接收消息时,指定 msgs := receive_mq .Consume() go func() { for d := range msgs { log.Printf("recevie2 Received a message: %s", d.Body) } }() } }
六、rabbitmq部分封装代码及准备工作
目录参考:
准备工作:
1.我们再创建go项目时,首先指定gopath目录,然后在目录下创建bin、src、pkg目录。
2.下载github.com/streadway/amqp包,会自动添加到项目的pkg目录下。
go get github.com/streadway/amqp
3.在rabbitmq服务器上创建用户,指定管理员,并赋予访问权限。
4.rabbitmq封装
package rabbitMQ import ( "encoding/json" "github.com/streadway/amqp" "log" ) //声明队列类型 type RabbitMQ struct { channel *amqp.Channel Name string exchange string } //连接服务器 func Connect(s string) * RabbitMQ{ //连接rabbitmq conn,e := amqp.Dial(s) failOnError(e,"连接Rabbitmq服务器失败!") ch ,e :=conn.Channel() failOnError(e,"无法打开频道!") mq := new(RabbitMQ) mq.channel =ch return mq } //初始化单个消息队列 //第一个参数:rabbitmq服务器的链接,第二个参数:队列名字 func New(s string,name string) * RabbitMQ{ //连接rabbitmq conn,e := amqp.Dial(s) failOnError(e,"连接Rabbitmq服务器失败!") ch ,e :=conn.Channel() failOnError(e,"无法打开频道!") q,e := ch.QueueDeclare( name,//队列名 false,//是否开启持久化 true,//不使用时删除 false, //排他 false, //不等待 nil, //参数 ) failOnError(e,"初始化队列失败!") mq := new(RabbitMQ) mq.channel =ch mq.Name =q.Name return mq } //批量初始化消息队列 //第一个参数:rabbitmq服务器的链接,第二个参数:队列名字列表 //声明交换机 func (q *RabbitMQ)QueueDeclare(queue string){ _,e := q.channel.QueueDeclare(queue,false,true,false,false,nil) failOnError(e,"声明交换机!") } //删除交换机 func (q *RabbitMQ)QueueDelete(queue string){ _,e := q.channel.QueueDelete(queue,false,true,false) failOnError(e,"删除队列失败!") } //配置队列参数 func (q *RabbitMQ)Qos(){ e := q.channel.Qos(1,0,false) failOnError(e,"无法设置QoS") } //配置交换机参数 //初始化交换机 //第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型 func NewExchange(s string,name string,typename string){ //连接rabbitmq conn,e := amqp.Dial(s) failOnError(e,"连接Rabbitmq服务器失败!") ch ,e :=conn.Channel() failOnError(e,"无法打开频道!") e = ch.ExchangeDeclare( name, // name typename, // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(e,"初始化交换机失败!") } //删除交换机 func (q *RabbitMQ)ExchangeDelete(exchange string){ e := q.channel.ExchangeDelete(exchange,false,true) failOnError(e,"绑定队列失败!") } //绑定消息队列到哪个exchange func (q *RabbitMQ)Bind(exchange string,key string){ e := q.channel.QueueBind( q.Name, key, exchange, false, nil, ) failOnError(e,"绑定队列失败!") q.exchange = exchange } //向消息队列发送消息 //Send方法可以往某个消息队列发送消息 func (q *RabbitMQ) Send(queue string,body interface{}){ str,e := json.Marshal(body) failOnError(e,"消息序列化失败!") e = q.channel.Publish( "",//交换 queue,//路由键 false, //必填 false, //立即 amqp.Publishing{ ReplyTo:q.Name, Body:[]byte(str), }) msg := "向队列:"+q.Name+"发送消息失败!" failOnError(e,msg) } //向exchange发送消息 //Publish方法可以往某个exchange发送消息 func (q *RabbitMQ) Publish(exchange string,body interface{},key string) { str,e := json.Marshal(body) failOnError(e,"消息序列化失败!") e = q.channel.Publish( exchange, key, false, false, amqp.Publishing{ReplyTo:q.Name, Body:[]byte(str)}, ) failOnError(e,"向路由发送消息失败!") } //接收某个消息队列的消息 func (q * RabbitMQ) Consume() <-chan amqp.Delivery{ c,e :=q.channel.Consume( q.Name,//指定从哪个队列中接收消息 "", true, false, false, false, nil, ) failOnError(e,"接收消息失败!") return c } //关闭队列连接 func (q *RabbitMQ) Close() { q.channel.Close() } //错误处理函数 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }
其中函数参数解析可以参考:Rabbitmq详解(基于go语言)