zoukankan      html  css  js  c++  java
  • golang使用rabbitmq多个消费者

    生产者.

    package main
    
    import (
        "fmt"
        "github.com/streadway/amqp"
        "log"
    )
    
    func main()  {
    
        //链接mq
        conn, err := amqp.Dial("amqp://guest:guest@192.168.2.232:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
    
        //通道
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()
    
        //设置数据
        q, err := ch.QueueDeclare(
            "a_test", // name
            true,   // durable
            false,   // delete when unused
            false,   // exclusive
            false,   // no-wait
            nil,     // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        for i:=1;i<=1000;i++{
            body := fmt.Sprintf("{"order_id":%d}",i)
            fmt.Println(body)
            err = ch.Publish(
                "",     // exchange
                q.Name, // routing key
                false,  // mandatory
                false,  // immediate
                amqp.Publishing {
                    ContentType: "text/plain",
                    Body:        []byte(body),
                })
        }
    
    
    
        failOnError(err, "Failed to publish a message")
    }
    
    
    
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
        }
    }

    消费者:

    package main
    
    import (
        "github.com/streadway/amqp"
        "log"
        "time"
    )
    
    
    
    func main()  {
    
    
        conn, err := amqp.Dial("amqp://guest:guest@192.168.2.232:5672/")
        if err != nil{
            log.Printf("err %s", err)
        }
    
        defer conn.Close()
    
        forever := make(chan bool)
        for  i:=1;i<4;i++{
            go func(routineNum int) {
                ch, err := conn.Channel()
                if err != nil{
                    log.Printf("err %s", err)
                }
                defer ch.Close()
                q, err := ch.QueueDeclare(
                    "a_test", // name
                    true,   // durable
                    false,   // delete when unused
                    false,   // exclusive
                    false,   // no-wait
                    nil,     // arguments
                )
                if err != nil{
                    log.Printf("err %s", err)
                }
    
                Msgs, err := ch.Consume(
                    q.Name, // queue
                    "",     // consumer
                    false,   // auto-ack
                    false,  // exclusive
                    false,  // no-local
                    false,  // no-wait
                    nil,    // args
                )
    
                if err != nil{
                    log.Printf("err %s", err)
                }
    
                for msg := range Msgs {
    
    
                    log.Printf("协程 %d    Received a message: %s",routineNum ,msg.Body)
                    time.Sleep(5*time.Second)
                    msg.Ack(true)
    
    
                }
    
            }(i)
        }
    
        <-forever
    
    
    
    
    
    
    }

    返回结果:

    2020/04/15 11:11:22 协程 3    Received a message: {"order_id":69}
    2020/04/15 11:11:22 协程 1    Received a message: {"order_id":601}
    2020/04/15 11:11:22 协程 2    Received a message: {"order_id":75}
    2020/04/15 11:11:27 协程 1    Received a message: {"order_id":602}
    2020/04/15 11:11:27 协程 3    Received a message: {"order_id":70}
    2020/04/15 11:11:27 协程 2    Received a message: {"order_id":76}
    2020/04/15 11:11:32 协程 1    Received a message: {"order_id":603}
    2020/04/15 11:11:32 协程 3    Received a message: {"order_id":71}
    2020/04/15 11:11:32 协程 2    Received a message: {"order_id":77}
  • 相关阅读:
    Clean Code读书笔记
    Junit 断言 assertThat Hamcrest匹配器
    SpringMVC 常用注解
    SpringMVC 流程 配置 接口
    ng-select ng-options ng-repeat的用法与区别
    javascript总结
    intellij安装 配置 创建项目
    git常用操作指令
    springmvc报错 org.springframework.web.servlet.DispatcherServlet
    linux笔记:RPM软件包管理-源码包管理
  • 原文地址:https://www.cnblogs.com/sunlong88/p/12703987.html
Copyright © 2011-2022 走看看