zoukankan      html  css  js  c++  java
  • golang操作RabbitMQ--话题模式

    创建连接及RabbitMQ结构体实例代码见 https://www.cnblogs.com/prince5460/p/11895844.html

    1.创建话题模式RabbitMQ实例

    func NewRabbitMQTopic(exchangeName, routingKey string) *RabbitMQ {
    	//创建RabbitMQ实例
    	rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
    	var err error
    	//获取connection
    	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
    	rabbitmq.failOnErr(err, "Failed to connect rabbitmq!")
    	//获取channel
    	rabbitmq.channel, err = rabbitmq.conn.Channel()
    	rabbitmq.failOnErr(err, "Failed to open a channel!")
    	return rabbitmq
    }
    

    2.话题模式发送消息

    func (r *RabbitMQ) PublishTopic(message string) {
    	//1.尝试创建交换机
    	err := r.channel.ExchangeDeclare(
    		r.Exchange,
    		//要改成topic
    		"topic",
    		true,
    		false,
    		false,
    		false,
    		nil,
    	)
    	r.failOnErr(err, "Failed to declare an exchange!")
    
    	//2.发送消息
    	err = r.channel.Publish(
    		r.Exchange,
    		//要设置
    		r.key,
    		false,
    		false,
    		amqp.Publishing{
    			ContentType: "text/plain",
    			Body:        []byte(message),
    		})
    }
    

    3.话题模式接收消息

    //要注意key规则
    //其中"*"用于匹配一个单词,"#"用于匹配多个单词(可以是零个)
    //匹配test.*表示匹配test.hello,但是test.hello.one需要用test.#才能匹配到
    func (r *RabbitMQ) ReceiveTopic() {
    	//1.试探性创建交换机
    	err := r.channel.ExchangeDeclare(
    		r.Exchange,
    		//交换机类型
    		"topic",
    		true,
    		false,
    		//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
    		false,
    		false,
    		nil,
    	)
    	r.failOnErr(err, "Failed to declare an exchange!")
    
    	//2.试探性创建队列,注意队列名称不要写
    	q, err := r.channel.QueueDeclare(
    		"", //随机生成队列名称
    		false,
    		false,
    		true,
    		false,
    		nil,
    	)
    	r.failOnErr(err, "Failed to declare an exchange!")
    
    	//3.绑定队列到exchange中
    	err = r.channel.QueueBind(
    		q.Name,
    		//需要绑定key
    		r.key,
    		r.Exchange,
    		false,
    		nil,
    	)
    
    	//4.消费消息
    	messages, err := r.channel.Consume(
    		q.Name,
    		"",
    		true,
    		false,
    		false,
    		false,
    		nil,
    	)
    
    	forever := make(chan bool)
    
    	go func() {
    		for d := range messages {
    			log.Printf("Received a message :%s", d.Body)
    		}
    	}()
    
    	fmt.Println("[*] Waiting for messages,To exit press CTRL+C")
    	<-forever
    }
    

    4.测试代码

    • Publish
    package main
    
    import (
    	"fmt"
    	"go-rabbitmq/RabbitMQ"
    	"strconv"
    	"time"
    )
    
    func main() {
    	testOne := RabbitMQ.NewRabbitMQTopic("exTestTopic", "test.topic.one")
    	testTwo := RabbitMQ.NewRabbitMQTopic("exTestTopic", "test.topic.two")
    	for i := 0; i <= 10; i++ {
    		testOne.PublishTopic("Hello test topic one:" + strconv.Itoa(i))
    		testTwo.PublishTopic("Hello test topic two:" + strconv.Itoa(i))
    		time.Sleep(time.Second)
    		fmt.Println("publish:", i)
    	}
    }
    
    • ReceiveAll
    package main
    
    import "go-rabbitmq/RabbitMQ"
    
    func main() {
    	testOne := RabbitMQ.NewRabbitMQTopic("exTestTopic","#")
    	testOne.ReceiveTopic()
    }
    
    
    • ReveiveOne
    package main
    
    import "go-rabbitmq/RabbitMQ"
    
    func main() {
    	testOne := RabbitMQ.NewRabbitMQTopic("exTestTopic", "test.*.one")
    	testOne.ReceiveTopic()
    }
    
    
  • 相关阅读:
    java
    EL表达式详解
    SVN的安装与配置
    javascript高级程序设计学习笔记
    java基础知识
    javascript高级程序设计学习笔记Chapter 5: Reference Types
    javascript模态,非模态窗体
    javascript执行顺序
    javascript的执行顺序2
    自动补全+汉字拼音双查(1)数据库
  • 原文地址:https://www.cnblogs.com/prince5460/p/11896026.html
Copyright © 2011-2022 走看看