zoukankan      html  css  js  c++  java
  • golang rabbitmq消费者与生产者(direct类型交换机)

    一 消费者

    package rabbitmq
    
    import (
        "bind_mqtt/config"
        "bind_mqtt/controller"
        . "bind_mqtt/structs"
        llog "github.com/sirupsen/logrus"
        "github.com/streadway/amqp"
        "github.com/timest/env"
    )
    
    func Rabbitmq() {
        msgid := 0
    
        cfg := new(Bindcfg)
        err := env.Fill(cfg)
        Rbmq_host := config.RBMQ_HOST
        Rbmq_pt := config.RBMQ_PT
        Rbmq_un := config.RBMQ_UN
        Rbmq_pw := config.RBMQ_PW
        Rbmq_vh := config.RBMQ_VH
        Rbmq_ex := config.RBMQ_EX
        Rbmq_ext := config.RBMQ_EXT
        Rbmq_que := config.RBMQ_QUE
        Rbmq_rk := config.RBMQ_RK
        if err != nil {
            //panic(err)
            llog.Info("panic: env rbmq")
        }else{
            Rbmq_host = cfg.RBMQHOST
            Rbmq_pt = cfg.RBMQPT
            Rbmq_un = cfg.RBMQUN
            Rbmq_pw = cfg.RBMQPW
            Rbmq_vh = cfg.RBMQVH
            Rbmq_ex = cfg.RBMQEX
            Rbmq_ext = cfg.RBMQEXT
            Rbmq_que = cfg.RBMQQUE
            Rbmq_rk = cfg.RBMQRK
            if Rbmq_host != "" && Rbmq_pt != "" &&Rbmq_un != "" &&Rbmq_pw != "" &&Rbmq_vh != "" && Rbmq_ex != "" &&Rbmq_ext != "" &&Rbmq_que != "" &&Rbmq_rk != ""{
                llog.Info("load env rbmq suc")
            }else{
                llog.Info("no env rbmq")
                Rbmq_host = config.RBMQ_HOST
                Rbmq_pt = config.RBMQ_PT
                Rbmq_un = config.RBMQ_UN
                Rbmq_pw = config.RBMQ_PW
                Rbmq_vh = config.RBMQ_VH
                Rbmq_ex = config.RBMQ_EX
                Rbmq_ext = config.RBMQ_EXT
                Rbmq_que = config.RBMQ_QUE
                Rbmq_rk = config.RBMQ_RK
            }
    
        }
    
        //fmt.Println(Rbmq_host,Rbmq_pt,Rbmq_un,Rbmq_pw,Rbmq_vh,Rbmq_ex,Rbmq_ext,Rbmq_que,Rbmq_rk)
        conn, err := amqp.Dial("amqp://"+Rbmq_un+":"+Rbmq_pw+"@"+Rbmq_host+":"+Rbmq_pt+"/"+Rbmq_vh)
    
        if err != nil {
            llog.Error(err, "Failed to connect to RabbitMQ")
        }
        defer conn.Close()
    
        ch, err := conn.Channel()
        if err != nil {
            llog.Error(err, "Failed to open a channel")
        }
        defer ch.Close()
    
        err1 := ch.ExchangeDeclare(
            Rbmq_ex,     // name of the exchange
            Rbmq_ext, // type
            true,         // durable
            false,        // delete when complete
            false,        // internal
            false,        // noWait
            nil,          // arguments
        )
        if err1 != nil {
            llog.Error(err1, "Exchange Declare")
        }
    
        q, err := ch.QueueDeclare(
            Rbmq_que, // name
            false,   // durable
            false,   // delete when unused
            false,   // exclusive
            false,   // no-wait
            nil,     // arguments
        )
        if err != nil {
            llog.Error(err, "Failed to declare a queue")
        }
    
        err2 := ch.QueueBind(
            q.Name,
            Rbmq_rk,
            Rbmq_ex,
            false,
            nil,
        )
        if err2 != nil {
            llog.Error(err2, "Failed to quene bind")
        }
    
        msgs, err := ch.Consume(
            q.Name, // queue
            "",     // consumer
            true,   // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        if err != nil {
            llog.Error(err, "Failed to register a consumer")
        }
    
        forever := make(chan bool)
    
        go func() {
            for d := range msgs {
                msg := string(d.Body)
                llog.Info("Received a message: ", msg)
                Msgid_rwLock.Lock()
                msgid++
                controller.SendMsgToTaskQueue(msgid,&msg)    //丢给进程池,以msgid值处理
                Msgid_rwLock.Unlock()
            }
        }()
        <-forever
    }

    二 生产者

      因为本人项目只涉及到消费者,故生产者代码为原始示例

      

    package main
    
    import (
        "fmt"
        "log"
        "time"
    
        "github.com/streadway/amqp"
    )
    
    func failOnError1(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
        }
    }
    
    // 只能在安装 rabbitmq 的服务器上操作
    func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/vhost")
        failOnError1(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
    
        ch, err := conn.Channel()
        failOnError1(err, "Failed to open a channel")
        defer ch.Close()
    
        err1 := ch.ExchangeDeclare(
            "exchange_name",     // name of the exchange
            "direct", // type
            true,         // durable
            false,        // delete when complete
            false,        // internal
            false,        // noWait
            nil,          // arguments
        )
        failOnError1(err1, "Exchange Declare")
    
        q, err := ch.QueueDeclare(
            "queue_name", // name
            false,   // durable
            false,   // delete when unused
            false,   // exclusive
            false,   // no-wait
            nil,     // arguments
        )
        failOnError1(err, "Failed to declare a queue")
        fmt.Println(q)
    
        for  {
            body := "Hello World!"
            err = ch.Publish(
                "exchange_name",     // exchange
                "route_key", // routing key
                false,  // mandatory
                false,  // immediate
                amqp.Publishing{
                    ContentType: "text/plain",
                    Body:        []byte(body),
                })
            log.Printf(" [x] Sent %s", body)
            failOnError1(err, "Failed to publish a message")
    
            time.Sleep(time.Second * 5)
        }
    
    }

    三 展示

     

  • 相关阅读:
    hdu 1088 HTML解析
    hdu1171 转化01背包,组合
    Java编程优化之旅(一)一般化方法
    Java简单实现Socket非阻塞通信
    Maven安装,以及导入Intellij IDEA
    笔记本的使用技巧
    Intellij IDEA使用小技巧
    学习Spring有关知识
    学习安装IntelliJ IDEA
    C#后台调用js方法无效果,未解决。
  • 原文地址:https://www.cnblogs.com/bushuwei/p/15504027.html
Copyright © 2011-2022 走看看