zoukankan      html  css  js  c++  java
  • golang使用rabbitmq正确姿势

    go mod init  github.com/ichunt2019/go-rabbitmq

    D:gocodego-rabbitmqutils abbitmq eceiver.go

    主要是实现了rabbimq 生产者 消费者

    消费者:实现失败尝试机制

    package rabbitmq
    
    import (
        //"errors"
        "fmt"
        "github.com/streadway/amqp"
        "log"
    )
    
    
    // 定义全局变量,指针类型
    var mqConn *amqp.Connection
    var mqChan *amqp.Channel
    
    // 定义生产者接口
    type Producer interface {
        MsgContent() string
    }
    
    // 定义生产者接口
    type RetryProducer interface {
        MsgContent() string
    }
    
    // 定义接收者接口
    type Receiver interface {
        Consumer([]byte)    error
        FailAction([]byte)  error
    }
    
    // 定义RabbitMQ对象
    type RabbitMQ struct {
        connection *amqp.Connection
        Channel *amqp.Channel
        dns string
        QueueName   string            // 队列名称
        RoutingKey  string            // key名称
        ExchangeName string           // 交换机名称
        ExchangeType string           // 交换机类型
        producerList []Producer
        retryProducerList []RetryProducer
        receiverList []Receiver
    }
    
    // 定义队列交换机对象
    type QueueExchange struct {
        QuName  string           // 队列名称
        RtKey   string           // key值
        ExName  string           // 交换机名称
        ExType  string           // 交换机类型
        Dns     string              //链接地址
    }
    
    
    
    // 链接rabbitMQ
    func (r *RabbitMQ)MqConnect() (err error){
    
        mqConn, err = amqp.Dial(r.dns)
        r.connection = mqConn   // 赋值给RabbitMQ对象
    
        if err != nil {
            fmt.Printf("关闭mq链接失败  :%s 
    ", err)
        }
    
        return
    }
    
    // 关闭mq链接
    func (r *RabbitMQ)CloseMqConnect() (err error){
    
        err = r.connection.Close()
        if err != nil{
            fmt.Printf("关闭mq链接失败  :%s 
    ", err)
        }
        return
    }
    
    // 链接rabbitMQ
    func (r *RabbitMQ)MqOpenChannel() (err error){
        mqConn := r.connection
        r.Channel, err = mqConn.Channel()
        //defer mqChan.Close()
        if err != nil {
            fmt.Printf("MQ打开管道失败:%s 
    ", err)
        }
        return err
    }
    
    // 链接rabbitMQ
    func (r *RabbitMQ)CloseMqChannel() (err error){
        r.Channel.Close()
        if err != nil {
            fmt.Printf("关闭mq链接失败  :%s 
    ", err)
        }
        return err
    }
    
    
    
    
    // 创建一个新的操作对象
    func NewMq(q QueueExchange) RabbitMQ {
        return RabbitMQ{
            QueueName:q.QuName,
            RoutingKey:q.RtKey,
            ExchangeName: q.ExName,
            ExchangeType: q.ExType,
            dns:q.Dns,
        }
    }
    
    func (mq *RabbitMQ) sendMsg (body string)  {
        err :=mq.MqOpenChannel()
        ch := mq.Channel
        if err != nil{
            log.Printf("Channel err  :%s 
    ", err)
        }
    
        defer mq.Channel.Close()
        if mq.ExchangeName != "" {
            if mq.ExchangeType == ""{
                mq.ExchangeType = "direct"
            }
            err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
            if err != nil {
                log.Printf("ExchangeDeclare err  :%s 
    ", err)
            }
        }
    
    
        // 用于检查队列是否存在,已经存在不需要重复声明
        _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)
        if err != nil {
            log.Printf("QueueDeclare err :%s 
    ", err)
        }
        // 绑定任务
        if mq.RoutingKey != "" && mq.ExchangeName != "" {
            err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
            if err != nil {
                log.Printf("QueueBind err :%s 
    ", err)
            }
        }
    
        if mq.ExchangeName != "" && mq.RoutingKey != ""{
            err = mq.Channel.Publish(
                mq.ExchangeName,     // exchange
                mq.RoutingKey, // routing key
                false,  // mandatory
                false,  // immediate
                amqp.Publishing {
                    ContentType: "text/plain",
                    Body:        []byte(body),
                })
        }else{
            err = mq.Channel.Publish(
                "",     // exchange
                mq.QueueName, // routing key
                false,  // mandatory
                false,  // immediate
                amqp.Publishing {
                    ContentType: "text/plain",
                    Body:        []byte(body),
                })
        }
    
    }
    
    
    func (mq *RabbitMQ) sendRetryMsg (body string,retry_nums int32,args ...string)  {
        err :=mq.MqOpenChannel()
        ch := mq.Channel
        if err != nil{
            log.Printf("Channel err  :%s 
    ", err)
        }
        defer mq.Channel.Close()
    
        if mq.ExchangeName != "" {
            if mq.ExchangeType == ""{
                mq.ExchangeType = "direct"
            }
            err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
            if err != nil {
                log.Printf("ExchangeDeclare err  :%s 
    ", err)
            }
        }
    
        //原始路由key
        oldRoutingKey := args[0]
        //原始交换机名
        oldExchangeName := args[1]
    
        table := make(map[string]interface{},3)
        table["x-dead-letter-routing-key"] = oldRoutingKey
        if oldExchangeName != "" {
            table["x-dead-letter-exchange"] = oldExchangeName
        }else{
            mq.ExchangeName = ""
            table["x-dead-letter-exchange"] = ""
        }
    
        table["x-message-ttl"] = int64(20000)
    
        //fmt.Printf("%+v",table)
        //fmt.Printf("%+v",mq)
        // 用于检查队列是否存在,已经存在不需要重复声明
        _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, table)
        if err != nil {
            log.Printf("QueueDeclare err :%s 
    ", err)
        }
        // 绑定任务
        if mq.RoutingKey != "" && mq.ExchangeName != "" {
            err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
            if err != nil {
                log.Printf("QueueBind err :%s 
    ", err)
            }
        }
    
        header := make(map[string]interface{},1)
    
        header["retry_nums"] = retry_nums + int32(1)
    
        var ttl_exchange string
        var ttl_routkey string
    
        if(mq.ExchangeName != "" ){
            ttl_exchange = mq.ExchangeName
        }else{
            ttl_exchange = ""
        }
    
    
        if mq.RoutingKey != "" && mq.ExchangeName != ""{
            ttl_routkey = mq.RoutingKey
        }else{
            ttl_routkey = mq.QueueName
        }
    
        //fmt.Printf("ttl_exchange:%s,ttl_routkey:%s 
    ",ttl_exchange,ttl_routkey)
        err = mq.Channel.Publish(
            ttl_exchange,     // exchange
            ttl_routkey, // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing {
                ContentType: "text/plain",
                Body:        []byte(body),
                Headers:header,
            })
        if err != nil {
            fmt.Printf("MQ任务发送失败:%s 
    ", err)
    
        }
    
    }
    
    
    // 监听接收者接收任务 消费者
    func (mq *RabbitMQ) ListenReceiver(receiver Receiver,routineNum int) {
        err :=mq.MqOpenChannel()
        ch := mq.Channel
        if err != nil{
            log.Printf("Channel err  :%s 
    ", err)
        }
        defer mq.Channel.Close()
        if mq.ExchangeName != "" {
            if mq.ExchangeType == ""{
                mq.ExchangeType = "direct"
            }
            err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
            if err != nil {
                log.Printf("ExchangeDeclare err  :%s 
    ", err)
            }
        }
    
    
        // 用于检查队列是否存在,已经存在不需要重复声明
        _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)
        if err != nil {
            log.Printf("QueueDeclare err :%s 
    ", err)
        }
        // 绑定任务
        if mq.RoutingKey != "" && mq.ExchangeName != "" {
            err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
            if err != nil {
                log.Printf("QueueBind err :%s 
    ", err)
            }
        }
        // 获取消费通道,确保rabbitMQ一个一个发送消息
        err =  ch.Qos(1, 0, false)
        msgList, err :=  ch.Consume(mq.QueueName, "", false, false, false, false, nil)
        if err != nil {
            log.Printf("Consume err :%s 
    ", err)
        }
        for msg := range msgList {
            retry_nums,ok := msg.Headers["retry_nums"].(int32)
            if(!ok){
                retry_nums = int32(0)
            }
            // 处理数据
            err := receiver.Consumer(msg.Body)
            if err!=nil {
                //消息处理失败 进入延时尝试机制
                if retry_nums < 3{
                    fmt.Println(string(msg.Body))
                    fmt.Printf("消息处理失败 消息开始进入尝试  ttl延时队列 
    ")
                    retry_msg(msg.Body,retry_nums,QueueExchange{
                            mq.QueueName,
                            mq.RoutingKey,
                            mq.ExchangeName,
                            mq.ExchangeType,
                            mq.dns,
                        })
                }else{
                    //消息失败 入库db
                    fmt.Printf("消息处理3次后还是失败了 入库db 钉钉告警 
    ")
                    receiver.FailAction(msg.Body)
                }
                err = msg.Ack(true)
                if err != nil {
                    fmt.Printf("确认消息未完成异常:%s 
    ", err)
                }
            }else {
                // 确认消息,必须为false
                err = msg.Ack(true)
    
                if err != nil {
                    fmt.Printf("消息消费ack失败 err :%s 
    ", err)
                }
            }
    
        }
    }
    
    //消息处理失败之后 延时尝试
    func retry_msg(msg []byte,retry_nums int32,queueExchange QueueExchange){
        //原始队列名称 交换机名称
        oldQName := queueExchange.QuName
        oldExchangeName := queueExchange.ExName
        oldRoutingKey := queueExchange.RtKey
        if oldRoutingKey == "" || oldExchangeName == ""{
            oldRoutingKey = oldQName
        }
    
        if queueExchange.QuName != "" {
            queueExchange.QuName = queueExchange.QuName + "_retry_3";
        }
    
        if queueExchange.RtKey != "" {
            queueExchange.RtKey = queueExchange.RtKey + "_retry_3";
        }else{
            queueExchange.RtKey = queueExchange.QuName + "_retry_3";
        }
    
    //fmt.Printf("%+v",queueExchange)
    
        mq := NewMq(queueExchange)
        mq.MqConnect()
    
        defer func(){
            mq.CloseMqConnect()
        }()
        //fmt.Printf("%+v",queueExchange)
        mq.sendRetryMsg(string(msg),retry_nums,oldRoutingKey,oldExchangeName)
    
    
    }
    
    
    func Send(queueExchange QueueExchange,msg string){
        mq := NewMq(queueExchange)
        mq.MqConnect()
    
        defer func(){
            mq.CloseMqConnect()
        }()
        mq.sendMsg(msg)
    
    }
    
    /*
    runNums  开启并发执行任务数量
     */
    func Recv(queueExchange QueueExchange,receiver Receiver,runNums int){
        mq := NewMq(queueExchange)
        mq.MqConnect()
    
        defer func(){
            mq.CloseMqConnect()
        }()
    
        forever := make(chan bool)
        for i:=1;i<=runNums;i++{
            go func(routineNum int) {
                defer mq.Channel.Close()
                // 验证链接是否正常
                mq.MqOpenChannel()
                mq.ListenReceiver(receiver,routineNum)
            }(i)
        }
        <-forever
    }
    
    
    type retryPro struct {
        msgContent   string
    }
    View Code

    D:gocodego-rabbitmqdemo ecv.go

    package main
    
    import (
        "fmt"
        "github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
        "time"
    )
    
    type RecvPro struct {
    
    }
    
    //// 实现消费者 消费消息失败 自动进入延时尝试  尝试3次之后入库db
    /*
    返回值 error 为nil  则表示该消息消费成功
    否则消息会进入ttl延时队列  重复尝试消费3次
    3次后消息如果还是失败 消息就执行失败  进入告警 FailAction
     */
    func (t *RecvPro) Consumer(dataByte []byte) error {
        //time.Sleep(500*time.Microsecond)
        //return errors.New("顶顶顶顶")
        fmt.Println(string(dataByte))
        time.Sleep(1*time.Second)
        //return errors.New("顶顶顶顶")
        return nil
    }
    
    //消息已经消费3次 失败了 请进行处理
    /*
    如果消息 消费3次后 仍然失败  此处可以根据情况 对消息进行告警提醒 或者 补偿  入库db  钉钉告警等等
     */
    func (t *RecvPro) FailAction(dataByte []byte) error {
        fmt.Println(string(dataByte))
        fmt.Println("任务处理失败了,我要进入db日志库了")
        fmt.Println("任务处理失败了,发送钉钉消息通知主人")
        return nil
    }
    
    
    
    func main() {
        t := &RecvPro{}
    
    
    
        //rabbitmq.Recv(rabbitmq.QueueExchange{
        //    "a_test_0001",
        //    "a_test_0001",
        //    "",
        //    "",
        //    "amqp://guest:guest@192.168.2.232:5672/",
        //},t,5)
    
        /*
            runNums: 表示任务并发处理数量  一般建议 普通任务1-3    就可以了
         */
        rabbitmq.Recv(rabbitmq.QueueExchange{
            "a_test_0001",
            "a_test_0001",
            "hello_go",
            "direct",
            "amqp://guest:guest@192.168.2.232:5672/",
        },t,3)
    
    
    
    }

    D:gocodego-rabbitmqdemosend.go

    package main
    
    import (
        "fmt"
        _ "fmt"
        "github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
    )
    
    func main() {
    
    
        for i := 1;i<10;i++{
            body := fmt.Sprintf("{"order_id":%d}",i)
            fmt.Println(body)
    
            /**
                使用默认的交换机
                如果是默认交换机
                type QueueExchange struct {
                QuName  string           // 队列名称
                RtKey   string           // key值
                ExName  string           // 交换机名称
                ExType  string           // 交换机类型
                Dns     string              //链接地址
                }
                如果你喜欢使用默认交换机
                RtKey  此处建议填写成 RtKey 和 QuName 一样的值
             */
    
            //queueExchange := rabbitmq.QueueExchange{
            //    "a_test_0001",
            //    "a_test_0001",
            //    "",
            //    "",
            //    "amqp://guest:guest@192.168.2.232:5672/",
            //}
    
            /*
             使用自定义的交换机
             */
            queueExchange := rabbitmq.QueueExchange{
                "a_test_0001",
                "a_test_0001",
                "hello_go",
                "direct",
                "amqp://guest:guest@192.168.2.232:5672/",
            }
    
            rabbitmq.Send(queueExchange,body)
    
    
        }
    
    
    }

    使用说明:

    go get github.com/ichunt2019/go-rabbitmq

    发送消息

    package main
    
    import (
        "fmt"
        _ "fmt"
        "github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
    )
    
    func main() {
    
    
        for i := 1;i<10;i++{
            body := fmt.Sprintf("{"order_id":%d}",i)
            fmt.Println(body)
    
            /**
                使用默认的交换机
                如果是默认交换机
                type QueueExchange struct {
                QuName  string           // 队列名称
                RtKey   string           // key值
                ExName  string           // 交换机名称
                ExType  string           // 交换机类型
                Dns     string              //链接地址
                }
                如果你喜欢使用默认交换机
                RtKey  此处建议填写成 RtKey 和 QuName 一样的值
             */
    
            //queueExchange := rabbitmq.QueueExchange{
            //    "a_test_0001",
            //    "a_test_0001",
            //    "",
            //    "",
            //    "amqp://guest:guest@192.168.2.232:5672/",
            //}
    
            /*
             使用自定义的交换机
             */
            queueExchange := rabbitmq.QueueExchange{
                "a_test_0001",
                "a_test_0001",
                "hello_go",
                "direct",
                "amqp://guest:guest@192.168.2.232:5672/",
            }
    
            rabbitmq.Send(queueExchange,body)
    
    
        }
    
    
    }

    消费消息

    package main
    
    import (
        "fmt"
        "github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
        "time"
    )
    
    type RecvPro struct {
    
    }
    
    //// 实现消费者 消费消息失败 自动进入延时尝试  尝试3次之后入库db
    /*
    返回值 error 为nil  则表示该消息消费成功
    否则消息会进入ttl延时队列  重复尝试消费3次
    3次后消息如果还是失败 消息就执行失败  进入告警 FailAction
     */
    func (t *RecvPro) Consumer(dataByte []byte) error {
        //time.Sleep(500*time.Microsecond)
        //return errors.New("顶顶顶顶")
        fmt.Println(string(dataByte))
        time.Sleep(1*time.Second)
        //return errors.New("顶顶顶顶")
        return nil
    }
    //消息已经消费3次 失败了 请进行处理
    /*
    如果消息 消费3次后 仍然失败  此处可以根据情况 对消息进行告警提醒 或者 补偿  入库db  钉钉告警等等
     */
    func (t *RecvPro) FailAction(dataByte []byte) error {
        fmt.Println(string(dataByte))
        fmt.Println("任务处理失败了,我要进入db日志库了")
        fmt.Println("任务处理失败了,发送钉钉消息通知主人")
        return nil
    }
    
    
    
    func main() {
        t := &RecvPro{}
    
    
    
        //rabbitmq.Recv(rabbitmq.QueueExchange{
        //    "a_test_0001",
        //    "a_test_0001",
        //    "",
        //    "",
        //    "amqp://guest:guest@192.168.2.232:5672/",
        //},t,5)
    
        /*
            runNums: 表示任务并发处理数量  一般建议 普通任务1-3    就可以了
         */
        rabbitmq.Recv(rabbitmq.QueueExchange{
            "a_test_0001",
            "a_test_0001",
            "hello_go",
            "direct",
            "amqp://guest:guest@192.168.2.232:5672/",
        },t,3)
    
    
    
    }
    
    说明:
    
    rabbitmq.Recv(rabbitmq.QueueExchange{
            "a_test_0001",
            "a_test_0001",
            "hello_go",
            "direct",
            "amqp://guest:guest@192.168.2.232:5672/",
        },t,3)

    第一个参数 QueueExchange说明

    // 定义队列交换机对象
    type QueueExchange struct {
        QuName  string           // 队列名称
        RtKey   string           // key值
        ExName  string           // 交换机名称
        ExType  string           // 交换机类型
        Dns     string              //链接地址
    }

    第二个参数 type Receiver interface说明

    ConsumerFailAction
    拿到消息后,用户可以处理任务,如果消费成功 返回nil即可,如果处理失败,返回一个自定义error即可 由于消息内部自带消息失败尝试3次机制,3次如果失败后就没必要一直存储在mq,所以此处扩展,可以用作消息补偿和告警
    // 定义接收者接口
    type Receiver interface {
        Consumer([]byte)    error
        FailAction([]byte)  error
    }

    第三个参数:runNusm

    runNusm
    消息并发数,同时可以处理多少任务 普通任务 设置为1即可 需要并发的设置成3-5即可
  • 相关阅读:
    383. Ransom Note
    598. Range Addition II
    453. Minimum Moves to Equal Array Elements
    492. Construct the Rectangle
    171. Excel Sheet Column Number
    697. Degree of an Array
    665. Nondecreasing Array
    视频网站使用H265编码能提高视频清晰度吗?
    现阶段的语音视频通话SDK需要解决哪些问题?
    企业远程高清会议平台视频会议系统在手机端使用的必备要求有哪些?
  • 原文地址:https://www.cnblogs.com/sunlong88/p/12717820.html
Copyright © 2011-2022 走看看