zoukankan      html  css  js  c++  java
  • golang操作RabbitMQ--订阅模式

    创建连接及RabbitMQ结构体实例代码见 https://www.cnblogs.com/prince5460/p/11895844.html

    1.创建RabbitMQ订阅模式实例

    func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
    	//创建RabbitMQ实例
    	rabbitmq := NewRabbitMQ("", exchangeName, "")
    	var err error
    	//获取connection
    	rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
    	rabbitmq.failOnErr(err, "Failed to connect rabbitmq!")
    	//获取channel
    	rabbitmq.channel, err = rabbitmq.conn.Channel()
    	rabbitmq.failOnErr(err, "Failed to open a channel!")
    	return rabbitmq
    }
    

    2.生产者

    func (r *RabbitMQ) PublishPub(message string) {
    	//1.尝试创建交换机
    	err := r.channel.ExchangeDeclare(
    		r.Exchange,
    		//订阅模式下为广播类型
    		"fanout",
    		true,
    		false,
    		//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
    		false,
    		false,
    		nil,
    	)
    	r.failOnErr(err, "Failed to declare an exchange!")
    
    	//2.发送消息
    	err = r.channel.Publish(
    		r.Exchange,
    		"",
    		false,
    		false,
    		amqp.Publishing{
    			ContentType: "text/plain",
    			Body:        []byte(message),
    		})
    }
    

    3.消费者

    func (r *RabbitMQ) ReceiveSub() {
    	//1.试探性创建交换机
    	err := r.channel.ExchangeDeclare(
    		r.Exchange,
    		//交换机类型
    		"fanout",
    		true,
    		false,
    		//true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
    		false,
    		false,
    		nil,
    	)
    	r.failOnErr(err, "Failed to declare an exchange!")
    
    	//2.试探性创建队列,注意队列名称不要写
    	q, err := r.channel.QueueDeclare(
    		"", //随机生成队列名称
    		false,
    		false,
    		true,
    		false,
    		nil,
    	)
    	r.failOnErr(err, "Failed to declare an exchange!")
    
    	//3.绑定队列到exchange中
    	err = r.channel.QueueBind(
    		q.Name,
    		//在pub/sub模式下,这里的key要为空
    		"",
    		r.Exchange,
    		false,
    		nil,
    	)
    
    	//4.消费消息
    	messages, err := r.channel.Consume(
    		q.Name,
    		"",
    		true,
    		false,
    		false,
    		false,
    		nil,
    	)
    
    	forever := make(chan bool)
    
    	go func() {
    		for d := range messages {
    			log.Printf("Received a message :%s", d.Body)
    		}
    	}()
    
    	fmt.Println("[*] Waiting for messages,To exit press CTRL+C")
    	<-forever
    }
    

    4.测试代码

    • Publish
    package main
    
    import (
    	"fmt"
    	"go-rabbitmq/RabbitMQ"
    	"strconv"
    	"time"
    )
    
    func main() {
    	rabbitmq := RabbitMQ.NewRabbitMQPubSub("newProduct")
    
    	for i := 0; i <= 100; i++ {
    		rabbitmq.PublishPub("订阅模式生产第" + strconv.Itoa(i) + "条数据")
    		fmt.Println("订阅模式生产第" + strconv.Itoa(i) + "条数据")
    		time.Sleep(time.Second)
    	}
    }
    
    
    • Subscribe1
    package main
    
    import "go-rabbitmq/RabbitMQ"
    
    func main() {
    	rabbitmq := RabbitMQ.NewRabbitMQPubSub("newProduct")
    	rabbitmq.ReceiveSub()
    }
    
    
    • Subscribe2
    package main
    
    import "go-rabbitmq/RabbitMQ"
    
    func main() {
    	rabbitmq := RabbitMQ.NewRabbitMQPubSub("newProduct")
    	rabbitmq.ReceiveSub()
    }
    
    
  • 相关阅读:
    第4次作业(条件)比较大小。第3次作业(条件)计算火车运行时间。
    GitHub搜索技巧
    flex实现左中固定不变,右边自适应
    JavaScript高级__原型继承+组合继承
    JavaScript高级__深入了解闭包
    JavaScript高级__执行上下文代码案例
    JavaScript中的显式原型链 prototype 和隐式原型 __proto__
    谷歌强大插件收集,持续更新。。。
    js中~~和^=
    vue自定义指令----directive
  • 原文地址:https://www.cnblogs.com/prince5460/p/11895881.html
Copyright © 2011-2022 走看看