zoukankan      html  css  js  c++  java
  • Golang使用RabbitMQ

    本文主要记录在go语言中使用RabbitMQ的相关示例代码

    关于RabbitMQ的基础知识参考:https://www.cnblogs.com/williamjie/p/9481774.html

    RabbitMQ

    本实例采用RabbitMQ中的订阅模型(Fanout、Direct、Topic)中的Direct消息模型

    在RabbitMQ中,无论是生产者和消费者实际上都属于Client。一个Client发送消息,哪些Client可以收到消息,其核心就在于Exchange,RoutingKey,Queue的关系上

    对于mq使用针对消费者和生产者来,主要步骤如下:

    首先都需要的步骤(示例代码中这一过程封装到提供初始化功能的库文件中):

    • 创建连接Conn
    • 创建通道Channel

    生产者:

    • 通过Channel声明Queue
    • 通过Channel声明Exchange(需指定Exchange type)
    • 创建Binding(指定一个BindingKey将Queue绑定到Exchange上)
    • 发送消息(需指定RoutingKey和Exchange)

    消费者:

    • 通过Channel声明Queue
    • 从Queue中取消息

    在两个角色中:

    • 生产者需要关注的是Exchange名称(因为消息需要指定发送到哪个Exchange)以及Exchange和Queue的绑定关系即Binding,所以下面的示例代码中将Exchange的定义以及Binding关系都写在了生产者中(实际上这里的代码可以放到功能库中,因为在项目中,这些关系都是通过配置的方式提前写好的)
    • 消费者只需要关注自己指定的一个Queue,从其中取消息,它对什么交换器,RoutingKey、Binding应该秉持关我毛事的态度

    编写MQ初始化库

    库提供的功能因人而异设计

    package rabbitMq
    
    import (
    	"log"
    
    	"github.com/streadway/amqp"
    ) //导入mq包
    
    // MQURL 格式 amqp://账号:密码@rabbitmq服务器地址:端口号/vhost (默认是5672端口)
    // 端口可在 /etc/rabbitmq/rabbitmq-env.conf 配置文件设置,也可以启动后通过netstat -tlnp查看
    const MQURL = "amqp://admin:huan91uncc@172.21.138.131:5672/"
    
    type RabbitMQ struct {
    	Conn    *amqp.Connection
    	Channel *amqp.Channel
    	// 队列名称
    	QueueName string
    	// 交换机
    	Exchange string
    	// routing Key
    	RoutingKey string
    	//MQ链接字符串
    	Mqurl string
    }
    
    // 创建结构体实例
    func NewRabbitMQ(queueName, exchange, routingKey string) *RabbitMQ {
    	rabbitMQ := RabbitMQ{
    		QueueName:  queueName,
    		Exchange:   exchange,
    		RoutingKey: routingKey,
    		Mqurl:      MQURL,
    	}
    	var err error
    	//创建rabbitmq连接
    	rabbitMQ.Conn, err = amqp.Dial(rabbitMQ.Mqurl)
    	checkErr(err, "创建连接失败")
    
    	//创建Channel
    	rabbitMQ.Channel, err = rabbitMQ.Conn.Channel()
    	checkErr(err, "创建channel失败")
    
    	return &rabbitMQ
    
    }
    
    // 释放资源,建议NewRabbitMQ获取实例后 配合defer使用
    func (mq *RabbitMQ) ReleaseRes() {
    	mq.Conn.Close()
    	mq.Channel.Close()
    }
    
    func checkErr(err error, meg string) {
    	if err != nil {
    		log.Fatalf("%s:%s
    ", meg, err)
    	}
    }
    
    

    生产者

    package main
    
    import (
    	"fmt"
    	"mq/rabbitMq"
    
    	"github.com/streadway/amqp"
    )
    
    //生产者发布流程
    func main() {
    	// 初始化mq
    	mq := rabbitMq.NewRabbitMQ("queue_publisher", "exchange_publisher", "key1")
    	defer mq.ReleaseRes() // 完成任务释放资源
    
    	// 1.声明队列
    	/*
    		如果只有一方声明队列,可能会导致下面的情况:
    			a)消费者是无法订阅或者获取不存在的MessageQueue中信息
    			b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃
    
    		为了避免上面的问题,所以最好选择两方一起声明
    		ps:如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的
    	*/
    	_, err := mq.Channel.QueueDeclare( // 返回的队列对象内部记录了队列的一些信息,这里没什么用
    		mq.QueueName, // 队列名
    		true,         // 是否持久化
    		false,        // 是否自动删除(前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。注意:生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列)
    		false,        // 是否为排他队列(排他的队列仅对“首次”声明的conn可见[一个conn中的其他channel也能访问该队列],conn结束后队列删除)
    		false,        // 是否阻塞
    		nil,          //额外属性(我还不会用)
    	)
    	if err != nil {
    		fmt.Println("声明队列失败", err)
    		return
    	}
    
    	// 2.声明交换器
    	err = mq.Channel.ExchangeDeclare(
    		mq.Exchange, //交换器名
    		"topic",     //exchange type:一般用fanout、direct、topic
    		true,        // 是否持久化
    		false,       //是否自动删除(自动删除的前提是至少有一个队列或者交换器与这和交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑)
    		false,       //设置是否内置的。true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式
    		false,       // 是否阻塞
    		nil,         // 额外属性
    	)
    	if err != nil {
    		fmt.Println("声明交换器失败", err)
    		return
    	}
    
    	// 3.建立Binding(可随心所欲建立多个绑定关系)
    	err = mq.Channel.QueueBind(
    		mq.QueueName,  // 绑定的队列名称
    		mq.RoutingKey, // bindkey 用于消息路由分发的key
    		mq.Exchange,   // 绑定的exchange名
    		false,         // 是否阻塞
    		nil,           // 额外属性
    	)
    	// err = mq.Channel.QueueBind(
    	// 	mq.QueueName,  // 绑定的队列名称
    	// 	"routingkey2", // bindkey 用于消息路由分发的key
    	// 	mq.Exchange,   // 绑定的exchange名
    	// 	false,         // 是否阻塞
    	// 	nil,           // 额外属性
    	// )
    	if err != nil {
    		fmt.Println("绑定队列和交换器失败", err)
    		return
    	}
    
    	// 4.发送消息
    	mq.Channel.Publish(
    		mq.Exchange,   // 交换器名
    		mq.RoutingKey, // routing key
    		false,         // 是否返回消息(匹配队列),如果为true, 会根据binding规则匹配queue,如未匹配queue,则把发送的消息返回给发送者
    		false,         // 是否返回消息(匹配消费者),如果为true, 消息发送到queue后发现没有绑定消费者,则把发送的消息返回给发送者
    		amqp.Publishing{ // 发送的消息,固定有消息体和一些额外的消息头,包中提供了封装对象
    			ContentType: "text/plain",           // 消息内容的类型
    			Body:        []byte("hello jochen"), // 消息内容
    		},
    	)
    }
    
    

    消费者

    package main
    
    import (
    	"fmt"
    	"mq/rabbitMq"
    )
    
    // 消费者订阅
    func main() {
    	// 初始化mq
    	mq := rabbitMq.NewRabbitMQ("queue_publisher", "exchange_publisher", "key1")
    	defer mq.ReleaseRes() // 完成任务释放资源
    
    	// 1.声明队列(两端都要声明,原因在生产者处已经说明)
    	_, err := mq.Channel.QueueDeclare( // 返回的队列对象内部记录了队列的一些信息,这里没什么用
    		mq.QueueName, // 队列名
    		true,         // 是否持久化
    		false,        // 是否自动删除(前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。注意:生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列)
    		false,        // 是否为排他队列(排他的队列仅对“首次”声明的conn可见[一个conn中的其他channel也能访问该队列],conn结束后队列删除)
    		false,        // 是否阻塞
    		nil,          // 额外属性(我还不会用)
    	)
    	if err != nil {
    		fmt.Println("声明队列失败", err)
    		return
    	}
    
    	// 2.从队列获取消息(消费者只关注队列)consume方式会不断的从队列中获取消息
    	msgChanl, err := mq.Channel.Consume(
    		mq.QueueName, // 队列名
    		"",           // 消费者名,用来区分多个消费者,以实现公平分发或均等分发策略
    		true,         // 是否自动应答
    		false,        // 是否排他
    		false,        // 是否接收只同一个连接中的消息,若为true,则只能接收别的conn中发送的消息
    		true,         // 队列消费是否阻塞
    		nil,          // 额外属性
    	)
    	if err != nil {
    		fmt.Println("获取消息失败", err)
    		return
    	}
    
    	for msg := range msgChanl {
    		// 这里写你的处理逻辑
    		// 获取到的消息是amqp.Delivery对象,从中可以获取消息信息
    		fmt.Println(string(msg.Body))
    		// msg.Ack(true) // 主动应答
    
    	}
    
    }
    
    

    web管理界面查看结果

    关于rabbitMQ的web管理界面如何使用可以看这里

    1. 连接信息:

    2. channel信息:

    3. 交换器信息:

    4. 队列信息:

    5. Binding信息
      入口:Exchange -> 点击想看的交换器

  • 相关阅读:
    TP5 关联模型使用(嵌套关联、动态排序以及隐藏字段)
    分布式与集群的区别是什么?
    模板函数函数模板 Function Template(C++Primer10)
    注意地方hadoop中的pi值计算
    文件错误关于hibernate中报Duplicate class/entity mapping org.model.User错的问题
    元素序列几个常用排序算法:一
    行语句mysql insert操作详解
    分量入度hdu 3836 Equivalent Sets
    查询语句编写高效SQL语句
    方法元素c语言范式编程之lsearch
  • 原文地址:https://www.cnblogs.com/deehuang/p/14632895.html
Copyright © 2011-2022 走看看