Rabitmq的环境搭建
定义和特征
- RabbitMQ是面向消息的中间件,用于组件之间的解耦,主要体现在消息的发送者和消费者之间的强依赖问题。
- RabbitMQ特点:高可用,扩展性,多语言客户端,管理界面等
- RabbitMQ使用场景:流量削峰,异步处理,应用解耦等
安装命令
- 安装erlang:
#由于RabbitMQ采用Erlang语言编写,所以先安装Erlang sudo apt-get install erlang-nox #如果在安装过程中报错:ubuntu18.04 E: dpkg 被中断,您必须手工运行 ‘sudo dpkg –configure -a’ 解决此问题。如果使用之后不行,那么删除掉然后重建即可。sudo rm /var/lib/dpkg/updates/* #安装RabbitMQ sudo apt-get update sudo apt-get upgrade sudo apt-get install rabbitmq-server # 查看运行状态 systemctl status rabbitmq-server #Active: active (running) 说明处于运行状态 # service rabbitmq-server status 用service指令也可以查看,同systemctl指令 #启动、停止、重启 service rabbitmq-server start # 启动 service rabbitmq-server stop # 停止 service rabbitmq-server restart # 重启 #web界面 rabbitmq-plugins enable rabbitmq_management # 启用插件 service rabbitmq-server restart # 重启
- 运行后,可以访问http://localhost:15672, 并使用默认密码:guest/guest进入管理界
更对多rabbitmq资料:https://blog.csdn.net/vrg000/article/details/81165030
RabbitMQ命令管理
- rabbitmq-plugins list : 列出当前rabbitMQ可以使用的插件
- rabbitmq-plugins enable/disable rabbitmq_management : 启动/关闭相应的插件
RabbitMQ核心概念
- VirtualHost:在web页面中的"Admin"页中,点击"Virtual host", 再点击"Add new virtual host",填写一个名称后,会多出一个虚拟host
- 通过添加用户,然后将虚拟host绑定到特定的用户
- 虚拟host,起到了数据隔离的作用,指定用户只能在对应的host上访问
- 当一个用户可以使用多个虚拟host时,对应的exchanges也很很多,这样我们可以隔离我们的开发环境
- Exchanges:交换机,相当于路由的中转站,生产者生产好数据后会首先通过它访问的路径找到对应RabbitMQ的key,并发送到其中
- Queues: 用来绑定我们的交换机,接受我们的消息,只要没有消费者就一直存在队列中
- Binding: 将队列绑定到对应的交换机上,不同的绑定形式,可以实现不同的工作模式
6种工作模式
- Simple模式:
代码实现:
首先我们需要设置GOPATH路径,同时设置全局的代理:在~/.bashrc中添加如下内容,然后source .bashrc
export PATH=$PATH:/usr/local/go/bin export GOPATH=/home/wangli/桌面/Go_code export GOPROXY=https://goproxy.cn,direct
创建一个项目,进入项目目录后,在终端中输入:go mod init demo (demo)随意起名字,里面可以会自动监听我们使用的所有包,我们只需要使用demo/xxx导入包就可以了
(Goland中可以通过go Moudle来创建)
go-mod相关
Go现在通过 go.mod
文件来配置模块加载使用是非常方便了,下面说下如何在当前项目中加载另一个本地正在开发的模块:
1、便捷一:默认使用 github.com/zhouzme/snail-go 包会到 github 上去下载,但这个包还在本地开发中并未push到线上,那么可以通过 replace 配置来重定向当前项目对该包的加载路径:replace github.com/zhouzme/snail-go => E:Gosnail-go 。 这里 E:Gosnail-go 为本地包的绝对路径,这样写就可以了,当本地snail-go包代码修改后就可以在当前项目看到实时效果了,注意中间符号是 => 2、便捷二:go mod init mygo (这里mygo名字也可以叫其他名字,一般为了与项目名称对应,就用项目名字) 项目目录下会生成go.mod文件, 类似于python的requirements.txt文件。同时也生成一个go.sum文件,主要记载了下载包的哈希值用于校验,我们用不到。 3、便捷三:go.mod文件一旦创建后,它的内容将会被go toolchain全面掌控。 go toolchain会在各类命令执行时,比如执行go get、go build、go run、go mod等命令时,自动修改和维护go.mod文件,这点跟pip还是有区别的
go.mod中提供了以下各类命令
1、module 语句指定包的名字(路径) 2、require 语句指定的依赖项模块 3、replace 语句可以替换依赖项模块 4、exclude 语句可以忽略依赖项模块
go.mod由于某些已知的原因,并不是所有的package都能成功下载,比如:golang.org
下的包。 可以在 go.mod 文件中使用 replace 指令替换成github上对应的库,来下载相应的包。比如:
replace ( golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 => github.com/golang/cryptov0.0.0-20190701094942-4def268fd1a4 ) 或者: replace golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 => github.com/golang/crypto v0.0.0-20190701094942-4def268fd1a4
go.mod相关使用命令
1. go mod init 初始化一个go.mod文件到当前目录, 实际上是创建了一个以当前目录为模块的mod。 2.go mod tidy 类似maven update, 通过这个命令下载项目依赖的第三方库,同时会去掉不相关的库。 3.go mod vender 把第三方的库拷贝到当前目录的vendor目录 4.go mod verify 检查下载的第三方库有没有本地修改,如果有修改,则会返回非0,否则验证成功。
项目代码
- RabbitMQ/rabbitmq.go
package RabbitMQ import ( "fmt" "github.com/streadway/amqp" "log" ) // 创建连接url const MQURL = "amqp://admin:admin@127.0.0.1:5672/test" type RabbitMQ struct { conn *amqp.Connection channel *amqp.Channel // 队列名称 QueueName string // 交换机 Exchange string // key Key string // 连接信息 Mqurl string } // 创建连接实例 func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ { // exchange 为空会使用默认的default rabbitmq := &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, Mqurl: MQURL} var err error // 创建rabbitmq来连接 rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnErr(err, "创建连接错误!") // 创建channel rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnErr(err, "获取channel失败") return rabbitmq } // 断开连接:channel和conn func (r *RabbitMQ) Destory() { r.channel.Close() r.conn.Close() } // 错误处理的函数 func (r *RabbitMQ) failOnErr(err error, message string) { if err != nil { log.Fatalf("%s: %s", message, err) panic(fmt.Sprintf("%s", message)) } } // step1: simple style 创建简单模式的实例, 只需要队列名 func NewRabbitMQSimple(queueName string) *RabbitMQ { return NewRabbitMQ(queueName, "", "") } // step2: 简单模式下生产 func (r *RabbitMQ) PublishSimple(message string) { // 固定用法 申请队列,如果队列不存在会自动创建,如果存在则直接使用,保证队列中能存入数据 _, err := r.channel.QueueDeclare( r.QueueName, // 队列名 false, // 控制是否持久化 false, // 是否自动删除,当最后一个消费者断开连接后是否删除 false, // 是否具有排他性,其他用户不可访问 false, // 是否阻塞 nil, // 额外属性 ) if err != nil { fmt.Println(err) } // 发送消息到队列中 r.channel.Publish( r.Exchange, r.QueueName, false, // mandatory 如果为true,会根据exchange类型和routkey规则,如果无法找到符合条件的队列那么会把消息返回给发送者 false, // immediate 如果为true,当exchange发送消息到队列后发现队列没有绑定消费者后会把消息返回 amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }) } // 简单模式的消费消息 func (r *RabbitMQ) ConsumeSimple() { // 固定用法 申请队列,如果队列不存在会自动创建,如果存在则直接使用,保证队列中能存入数据 _, err := r.channel.QueueDeclare( r.QueueName, // 队列名 false, // 控制是否持久化 false, // 是否自动删除,当最后一个消费者断开连接后是否删除 false, // 是否具有排他性,其他用户不可访问 false, // 是否阻塞 nil, // 额外属性 ) if err != nil { fmt.Println(err) } // 接受消息 msgs, err := r.channel.Consume( r.QueueName, "", // 用来区分多个消费在 true, // 是否自动应答, 主动的告诉mq自己已经消费完了,如果false,需要回调函数 false, // 是否排他性 false, // 如果设置为true, 表示不能将同一个connection中发送的消息传递给这个connect中的消费者 false, // 设置为阻塞,一个一个消费 nil, // 附加信息 ) if err != nil { fmt.Println(err) } // 消费时的固定写法,用来阻塞 forever := make(chan bool) // 启用协程处理消息 go func() { for d := range msgs { // 实现我们要处理的逻辑函数 log.Printf("Received a message: %s", d.Body) } }() log.Printf("[*] Waiting for message,To exit press CTRL + C") <- forever }
- mainSimplePublish.go
package main import ( "demo/RabbitMQ" "fmt" ) func main() { rabbitmq := RabbitMQ.NewRabbitMQSimple("testSimple") rabbitmq.PublishSimple("Hello Test! ") fmt.Println("发送成功!") }
- mainSimpleRecieve.go
package main import ( "demo/RabbitMQ" ) func main() { rabbitmq := RabbitMQ.NewRabbitMQSimple("testSimple") rabbitmq.ConsumeSimple() }
运行后可以查看web界面的使用图