zoukankan      html  css  js  c++  java
  • golang操作RabbitMQ--话题模式

    创建连接及RabbitMQ结构体实例代码见 https://www.cnblogs.com/prince5460/p/11895844.html

    1.创建话题模式RabbitMQ实例

    func NewRabbitMQTopic(exchangeName, routingKey string) *RabbitMQ {
    	//创建RabbitMQ实例
    	rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
    	var err error
    	//获取connection
    	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
    	rabbitmq.failOnErr(err, "Failed to connect rabbitmq!")
    	//获取channel
    	rabbitmq.channel, err = rabbitmq.conn.Channel()
    	rabbitmq.failOnErr(err, "Failed to open a channel!")
    	return rabbitmq
    }
    

    2.话题模式发送消息

    func (r *RabbitMQ) PublishTopic(message string) {
    	//1.尝试创建交换机
    	err := r.channel.ExchangeDeclare(
    		r.Exchange,
    		//要改成topic
    		"topic",
    		true,
    		false,
    		false,
    		false,
    		nil,
    	)
    	r.failOnErr(err, "Failed to declare an exchange!")
    
    	//2.发送消息
    	err = r.channel.Publish(
    		r.Exchange,
    		//要设置
    		r.key,
    		false,
    		false,
    		amqp.Publishing{
    			ContentType: "text/plain",
    			Body:        []byte(message),
    		})
    }
    

    3.话题模式接收消息

    //要注意key规则
    //其中"*"用于匹配一个单词,"#"用于匹配多个单词(可以是零个)
    //匹配test.*表示匹配test.hello,但是test.hello.one需要用test.#才能匹配到
    func (r *RabbitMQ) ReceiveTopic() {
    	//1.试探性创建交换机
    	err := r.channel.ExchangeDeclare(
    		r.Exchange,
    		//交换机类型
    		"topic",
    		true,
    		false,
    		//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
    		false,
    		false,
    		nil,
    	)
    	r.failOnErr(err, "Failed to declare an exchange!")
    
    	//2.试探性创建队列,注意队列名称不要写
    	q, err := r.channel.QueueDeclare(
    		"", //随机生成队列名称
    		false,
    		false,
    		true,
    		false,
    		nil,
    	)
    	r.failOnErr(err, "Failed to declare an exchange!")
    
    	//3.绑定队列到exchange中
    	err = r.channel.QueueBind(
    		q.Name,
    		//需要绑定key
    		r.key,
    		r.Exchange,
    		false,
    		nil,
    	)
    
    	//4.消费消息
    	messages, err := r.channel.Consume(
    		q.Name,
    		"",
    		true,
    		false,
    		false,
    		false,
    		nil,
    	)
    
    	forever := make(chan bool)
    
    	go func() {
    		for d := range messages {
    			log.Printf("Received a message :%s", d.Body)
    		}
    	}()
    
    	fmt.Println("[*] Waiting for messages,To exit press CTRL+C")
    	<-forever
    }
    

    4.测试代码

    • Publish
    package main
    
    import (
    	"fmt"
    	"go-rabbitmq/RabbitMQ"
    	"strconv"
    	"time"
    )
    
    func main() {
    	testOne := RabbitMQ.NewRabbitMQTopic("exTestTopic", "test.topic.one")
    	testTwo := RabbitMQ.NewRabbitMQTopic("exTestTopic", "test.topic.two")
    	for i := 0; i <= 10; i++ {
    		testOne.PublishTopic("Hello test topic one:" + strconv.Itoa(i))
    		testTwo.PublishTopic("Hello test topic two:" + strconv.Itoa(i))
    		time.Sleep(time.Second)
    		fmt.Println("publish:", i)
    	}
    }
    
    • ReceiveAll
    package main
    
    import "go-rabbitmq/RabbitMQ"
    
    func main() {
    	testOne := RabbitMQ.NewRabbitMQTopic("exTestTopic","#")
    	testOne.ReceiveTopic()
    }
    
    
    • ReveiveOne
    package main
    
    import "go-rabbitmq/RabbitMQ"
    
    func main() {
    	testOne := RabbitMQ.NewRabbitMQTopic("exTestTopic", "test.*.one")
    	testOne.ReceiveTopic()
    }
    
    
  • 相关阅读:
    Spring Boot 2 (七):Spring Boot 如何解决项目启动时初始化资源
    Spring Boot 2 (八):Spring Boot 集成 Memcached
    Spring Boot 2 (五):Docker Compose + Spring Boot + Nginx + Mysql 实践
    Spring Boot 2 (六):使用 Docker 部署 Spring Boot 开源软件云收藏
    Spring Boot 2 (四):使用 Docker 部署 Spring Boot
    微信开发中,不同手机系统遇到的bug(不定时更新)
    gulp-sourcemaps的用法
    实现输入框高度随内容变化
    微信开发,浏览器缓存问题
    mac中nvm的安装和使用
  • 原文地址:https://www.cnblogs.com/prince5460/p/11896026.html
Copyright © 2011-2022 走看看