zoukankan      html  css  js  c++  java
  • Golang使用RabbitMQ消息中间件amqp协议

    "github.com/streadway/amqp"

    Publish发布

    // amqp://<user>:<password>@<ip>:<port>
    var addr = "amqp://test:test123@127.0.0.1:5672" //test
    
    func main() {
          
        // 建立连接
        conn, err := amqp.Dial(addr)
        if nil != err {
    	logs.Error(err)
    	return
        }
    
        defer conn.Close()
        // 申请通道
        ch, err := conn.Channel()
        if nil != err {
    	    logs.Error(err)
    	    return
        }
    
        defer ch.Close()
        // 定义交换“direct”、“fanout”、“topic”和“headers”
        err = ch.ExchangeDeclare("happy", amqp.ExchangeTopic, true, false, false, false, nil)
        if nil != err {
    	    logs.Error(err)
    	    return
        }
    
        data = fmt.Sprintf("hello,world!!!")
            //a.b.c.d.e 为发布key,以.分割;
        err = ch.Publish("happy", "a.b.c.d.e", false, false,
    	    amqp.Publishing{
    		    ContentType:  "text/plain",
    		    Body:         []byte(data),
    		    DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
    	    })
        if nil != err {
    	    logs.Error(err)
    	    return
        }
    
    }
    

    Recover 接收

    // amqp://<user>:<password>@<ip>:<port>
    var addr = "amqp://test:test123@127.0.0.1:5672" //test
    
    func main() {
    
        // 建立连接
        conn, err := amqp.Dial(addr)
        if nil != err {
                logs.Error(err)
    	    return
        }
    
        defer conn.Close()
        // 申请通道
        ch, err := conn.Channel()
        if nil != err {
    	    logs.Error(err)
    	    return
        }
    
        defer ch.Close()
        // 定义交换
        err = ch.ExchangeDeclare("happy", amqp.ExchangeTopic, true, false, false, false, nil)
        if nil != err {
    	    logs.Error(err)
    	    return
        }
    
        queName := "test.test1.test2"
        topic := "a.#"
    
    
        // 定义通道
        que, err := ch.QueueDeclare(queName, false, false, false, false, nil)
        if nil != err {
        	    logs.Error(err)
        }
    
        err = ch.QueueBind(que.Name, topic, "happy", false, nil)
        if nil != err {
    	    logs.Error(err)
    	    return
        }
    
        msges, err := ch.Consume(que.Name, "", true, false, false, false, nil)
        if nil != err {
    	    logs.Error(err)
    	    return
        }
    
        logs.Info("start recv")
    
        for msg := range msges {
    	    fmt.Println(">>> %s", string(msg.Body))
    	
        }
    
    }
    

    http管理端口是15672

    注:队列应先注册一次,才能收到消息

  • 相关阅读:
    Java 技术笔记
    idea启动TOMCAT html 乱码
    IntelliJ IDEA 导入新项目
    InterlliJ Debug方式启动:method breakpoints may dramatically show down debugging
    intelliJ idea #region 代码折叠
    Console 程序在任务计划程序无法读写文件
    Java 发送邮件
    MySQL 索引
    MySQL 临时表
    11 帧差法获取运动
  • 原文地址:https://www.cnblogs.com/TSlover/p/11613759.html
Copyright © 2011-2022 走看看