在工作中发现,有些时候消息因为某些原因在消费一次后,如果消息失败,这时候不ack,消息就回一直重回队列首部,造成消息拥堵。
如是有了如下思路:
消息进入队列前,header默认有参数 retry_num=0 表示尝试次数;
消费者在消费时候的,如果消息失败,就把消息插入另外一个队列(队列abc);该队列abc 绑定一个死信队列(原始消费的队列),这样形成一个回路;
当消息失败后,消息就进入队列abc,队列abc拥有ttl过期时间,ttl过期时间到了后,该消息进入死信队列(死信队列刚好是刚开始我们消费的队列);
这样消息就又回到原始消费队列尾部了;
最后可以通过队列消息头部的header参数retry_num 可以控制消息消费多少次后,直接插入db日志;
db日志可以记录交换机 路由,queuename,这样,可以做一个后台管理,可以手动一次把消息重新放入队列,进行消息(因为有时间消费队列里面可能在请求其它服务,其它服务也可能会挂掉)
这时候消息无论你消费多少次都没有用,但是入库db后,可以一键重回队列消息(当我们知道服务已经正常后)
图解:
附上代码
git clone https://github.com/sunlongv520/go-msgserver.git
package rabbitmq import ( "errors" "fmt" "github.com/streadway/amqp" "sync" "time" ) // 定义全局变量,指针类型 var mqConn *amqp.Connection var mqChan *amqp.Channel // 定义生产者接口 type Producer interface { MsgContent() string } // 定义生产者接口 type RetryProducer interface { MsgContent() string } // 定义接收者接口 type Receiver interface { Consumer([]byte) error } // 定义RabbitMQ对象 type RabbitMQ struct { connection *amqp.Connection channel *amqp.Channel dns string queueName string // 队列名称 routingKey string // key名称 exchangeName string // 交换机名称 exchangeType string // 交换机类型 producerList []Producer retryProducerList []RetryProducer receiverList []Receiver mu sync.RWMutex wg sync.WaitGroup } // 定义队列交换机对象 type QueueExchange struct { QuName string // 队列名称 RtKey string // key值 ExName string // 交换机名称 ExType string // 交换机类型 Dns string //链接地址 } // 链接rabbitMQ func (r *RabbitMQ)mqConnect() (err error){ //var err error //RabbitUrl := fmt.Sprintf("amqp://%s:%s@%s:%d/", "guest", "guest", "192.168.2.232", 5672) //mqConn, err = amqp.Dial(RabbitUrl) mqConn, err = amqp.Dial(r.dns) r.connection = mqConn // 赋值给RabbitMQ对象 if err != nil { return err //fmt.Printf("MQ打开链接失败:%s ", err) } mqChan, err = mqConn.Channel() r.channel = mqChan // 赋值给RabbitMQ对象 if err != nil { return err //fmt.Printf("MQ打开管道失败:%s ", err) } return err } // 关闭RabbitMQ连接 func (r *RabbitMQ)mqClose() { // 先关闭管道,再关闭链接 err := r.channel.Close() if err != nil { fmt.Printf("MQ管道关闭失败:%s ", err) } err = r.connection.Close() if err != nil { fmt.Printf("MQ链接关闭失败:%s ", err) } } // 创建一个新的操作对象 func New(q *QueueExchange) *RabbitMQ { return &RabbitMQ{ queueName:q.QuName, routingKey:q.RtKey, exchangeName: q.ExName, exchangeType: q.ExType, dns:q.Dns, } } // 启动RabbitMQ客户端,并初始化 func (r *RabbitMQ) Start() (err error){ // 开启监听生产者发送任务 for _, producer := range r.producerList { err = r.listenProducer(producer) } // 开启监听接收者接收任务 for _, receiver := range r.receiverList { //r.listenReceiver(receiver) r.wg.Add(1) go func() { err = r.listenReceiver(receiver) }() } r.wg.Wait() time.Sleep(time.Microsecond*100) return err } type SendRbmqPro struct { msgContent string } // 实现生产者 func (t *SendRbmqPro) MsgContent() string { return t.msgContent } // 注册发送指定队列指定路由的生产者 func (r *RabbitMQ) RegisterProducer(msg string) { a := &SendRbmqPro{msgContent:msg} a.MsgContent() r.producerList = append(r.producerList, a) } // 发送任务 func (r *RabbitMQ) listenProducer(producer Producer) (err error){ // 验证链接是否正常,否则重新链接 if r.channel == nil { err = r.mqConnect() if err !=nil { return err } } err = r.channel.ExchangeDeclare(r.exchangeName, r.exchangeType, true, false, false, false, nil) if err != nil { fmt.Printf("MQ注册交换机失败:%s ", err) } _, err = r.channel.QueueDeclare(r.queueName, true, false, false, false, nil) if err != nil { fmt.Printf("MQ注册队列失败:%s ", err) } // 队列绑定 err = r.channel.QueueBind(r.queueName, r.routingKey, r.exchangeName, true,nil) if err != nil { fmt.Printf("MQ绑定队列失败:%s ", err) } header := make(map[string]interface{},1) header["retry_nums"] = int32(0) // 发送任务消息 err = r.channel.Publish(r.exchangeName, r.routingKey, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(producer.MsgContent()), Headers:header, }) if err != nil { fmt.Printf("MQ任务发送失败:%s ", err) } return err } func (r *RabbitMQ) listenRetryProducer(producer RetryProducer,retry_nums int32 ,args ...string) { fmt.Println("消息处理失败,进入延时队列.....") //defer r.mqClose() // 验证链接是否正常,否则重新链接 if r.channel == nil { r.mqConnect() } err := r.channel.ExchangeDeclare(r.exchangeName, r.exchangeType, true, false, false, false, nil) if err != nil { fmt.Printf("MQ注册交换机失败:%s ", err) return } //原始路由key oldRoutingKey := args[0] //原始交换机名 oldExchangeName := args[1] table := make(map[string]interface{},3) table["x-dead-letter-routing-key"] = oldRoutingKey table["x-dead-letter-exchange"] = oldExchangeName table["x-message-ttl"] = int64(20000) _, err = r.channel.QueueDeclare(r.queueName, true, false, false, false, table) if err != nil { fmt.Printf("MQ注册队列失败:%s ", err) return } // 队列绑定 err = r.channel.QueueBind(r.queueName, r.routingKey, r.exchangeName, true,nil) if err != nil { fmt.Printf("MQ绑定队列失败:%s ", err) return } header := make(map[string]interface{},1) header["retry_nums"] = retry_nums + int32(1) // 发送任务消息 err = r.channel.Publish(r.exchangeName, r.routingKey, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(producer.MsgContent()), Headers:header, }) if err != nil { fmt.Printf("MQ任务发送失败:%s ", err) return } } // 注册接收指定队列指定路由的数据接收者 func (r *RabbitMQ) RegisterReceiver(receiver Receiver) { r.mu.Lock() r.receiverList = append(r.receiverList, receiver) r.mu.Unlock() } // 监听接收者接收任务 消费者 func (r *RabbitMQ) listenReceiver(receiver Receiver) (err error) { // 处理结束关闭链接 defer r.mqClose() defer r.wg.Done() //defer // 验证链接是否正常 if r.channel == nil { err = r.mqConnect() if err != nil{ return errors.New(fmt.Sprintf("MQ注册队列失败:%s ", err)) } } // 用于检查队列是否存在,已经存在不需要重复声明 _, err = r.channel.QueueDeclare(r.queueName, true, false, false, false, nil) if err != nil { return errors.New(fmt.Sprintf("MQ注册队列失败:%s ", err)) } // 绑定任务 err = r.channel.QueueBind(r.queueName, r.routingKey, r.exchangeName, false, nil) if err != nil { return errors.New(fmt.Sprintf("绑定队列失败:%s ", err)) } // 获取消费通道,确保rabbitMQ一个一个发送消息 err = r.channel.Qos(1, 0, false) msgList, err := r.channel.Consume(r.queueName, "", false, false, false, false, nil) if err != nil { return errors.New(fmt.Sprintf("获取消费通道异常:%s ", err)) } for msg := range msgList { retry_nums,ok := msg.Headers["retry_nums"].(int32) if(!ok){ retry_nums = int32(0) } // 处理数据 err := receiver.Consumer(msg.Body) if err!=nil { //消息处理失败 进入延时尝试机制 if retry_nums < 3{ r.retry_msg(msg.Body,retry_nums) }else{ //消息失败 入库db fmt.Printf("消息处理失败 入库db") } err = msg.Ack(true) if err != nil { return errors.New(fmt.Sprintf("确认消息未完成异常:%s ", err)) } }else { // 确认消息,必须为false err = msg.Ack(true) if err != nil { return errors.New(fmt.Sprintf("确认消息完成异常:%s ", err)) } return err } } return } type retryPro struct { msgContent string } // 实现生产者 func (t *retryPro) MsgContent() string { return t.msgContent } //消息处理失败之后 延时尝试 func(r *RabbitMQ) retry_msg(Body []byte,retry_nums int32){ queueName := r.queueName+"_retry_3" routingKey := r.queueName+"_retry_3" exchangeName := r.exchangeName queueExchange := &QueueExchange{ queueName, routingKey, exchangeName, "direct", r.dns, } mq := New(queueExchange) msg := fmt.Sprintf("%s",Body) t := &retryPro{ msg, } mq.listenRetryProducer(t,retry_nums,r.routingKey,exchangeName) }
package main import ( "fmt" "github.com/ichunt2019/go-msgserver/utils/rabbitmq" "time" "errors" ) type RecvPro struct { } //// 实现消费者 消费消息失败 自动进入延时尝试 尝试3次之后入库db func (t *RecvPro) Consumer(dataByte []byte) error { fmt.Println(string(dataByte)) return errors.New("顶顶顶顶") //return nil } func main() { //消费者实现 下面接口即可 //type Receiver interface { // Consumer([]byte) error //} t := &RecvPro{} queueExchange := &rabbitmq.QueueExchange{ "fengkong_static_count", "fengkong_static_count", "fengkong_exchange", "direct", "amqp://guest:guest@192.168.2.232:5672/", } for{ mq := rabbitmq.New(queueExchange) mq.RegisterReceiver(t) err :=mq.Start() if err != nil{ fmt.Println(err) } time.Sleep(time.Second) } }
package main import ( "fmt" _ "fmt" "github.com/ichunt2019/go-msgserver/utils/rabbitmq" "strconv" ) func main() { queueExchange := &rabbitmq.QueueExchange{ "b_test_rabbit", "b_test_rabbit", "b_test_rabbit_mq", "direct", "amqp://guest:guest@192.168.2.232:5672/", } mq := rabbitmq.New(queueExchange) for i := 0;i<10;i++{ mq.RegisterProducer("这是测试任务"+strconv.Itoa(i)) } err := mq.Start() if(err != nil){ fmt.Println("发送消息失败") } }