1 RabbitMQ简介
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现,官网地址:http://www.rabbitmq.com。RabbitMQ作为一个消息代理,主要负责接收、存储和转发消息,它提供了可靠的消息机制和灵活的消息路由,并支持消息集群和分布式部署,常用于应用解耦,耗时任务队列,流量削锋等场景。本系列文章将系统介绍RabbitMQ的工作机制,代码驱动和集群配置,本篇主要介绍RabbitMQ中一些基本概念,常用的RabbitMQ Control命令,最后写一个C#驱动的简单栗子。先看一下RabbitMQ的基本结构:
上图是RabbitMQ的一个基本结构,生产者Producer和消费者Consumer都是RabbitMQ的客户端,Producer负责发送消息,Consumer负责消费消息。
接下来我们结合这张图来理解RabbitMQ的一些概念:
Broker(Server):接受客户端连接,实现AMQP消息队列和路由功能的进程,我们可以把Broker叫做RabbitMQ服务器。
Virtual Host:一个虚拟概念,一个Virtual Host里面可以有若干个Exchange和Queue,主要用于权限控制,隔离应用。如应用程序A使用VhostA,应用程序B使用VhostB,那么我们在VhostA中只存放应用程序A的exchange,queue和消息,应用程序A的用户只能访问VhostA,不能访问VhostB中的数据。
Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有Direct、Fanout、Topic和Header四种,不同类型的Exchange路由规则是不一样的(这些以后会详细介绍)。
Queue:消息队列,用于存储还未被消费者消费的消息,队列是先进先出的,默认情况下先存储的消息先被处理。
Message:就是消息,由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等,Body是真正传输的数据,内容格式为byte[]。
Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。
Channel:信道,仅仅创建了客户端到Broker之间的连接Connection后,客户端还是不能发送消息的。需要在Connection的基础上创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令,一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的。
2 RabbitMQ安装
因为RabbitMQ是用erlang语言开发的,所以我们在安装RabbitMQ前必须要安装erlang支持。
1 Windows平台安装
1 安装erlang
首先下载erlang,直接下载最新版本,当前下载的是 OTP 21.3 Windows 64-bit Binary File ,下载完成后一直下一步安装即可。
2 安装RabbitMQ
下载Windows平台的RabbtMQ,当前下载的是 rabbitmq-server-3.7.14.exe ,下载完成后一直下一步安装即可。
3 安装Web管理插件
打开RabbitMQ Command Prompt,执行命令 rabbitmq-plugins enable rabbitmq_management 即可完成Web监控插件的安装。
安装完成后,打开浏览器输入 http://127.0.0.1:15672/ 使用默认账号[ name:guest / password:guest ]登录后界面如下,使用这个UI插件我们可以轻松的查看RabbitMQ中的交换机(exchange),队列(queue)等内容,也可以对exchange,queue,用户等进行添加、修改、删除操作。
到这一步Windows平台安装RabbitMQ完成了。 打开服务管理器,RabbitMQ已经在正常运行了,如下:
2 Centos安装RabbitMQ
1 安装RabbitMQ
这里虚拟机系统为Centos7,采用的安装方式是yum安装,为了简单,这里直接使用官方提供的erlang和RabbitMQ-server的自动安装脚本(官方安装文档),逐行执行下边的代码就可以安装完成erlang和RabbitMQ。
#安装socat yum install socat #安装erlang curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash yum -y install erlang #安装rabbitmq-server curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash yum -y install rabbitmq-server #启动rabbitmq服务 systemctl start rabbitmq-server #添加web管理插件 rabbitmq-plugins enable rabbitmq_management
补充:如果安装完成后,执行RabbitMQ执行命令特别慢,或者出现报错【rabbitmq unable to perform an operation on node xxx@xxx】,解决方法:
编辑hosts,执行命令 vim /etc/hosts ,添加本机IP(或者虚拟机IP)
命令执行结束后,使用浏览器访问 http://127.0.0.1:15672/ 也会出现web管理界面。通过上边的安装步骤安装的RabbitMQ会生成Unit文件,所以我们可以使用Systemd管理RabbitMQ服务,以下是几条常用的命令:
#启动RabbitMQ服务 systemctl start rabbitmq-server #停止RabbitMQ服务 systemctl stop rabbitmq-server #查看RabbitMQ运行状态 systemctl status rabbitmq-server #重启RabbitMQ服务 systemctl restart rabbitmq-server
2 RabbitMQ Control工具
使用Web管理界面可以实现RabbitMQ的大部分常用功能,但是有些功能WebUI是做不到的,如:开启/关闭RabbitMQ应用程序和集群的管理等。RabbitMQ Control是RabbitMQ的命令行管理工具,可以调用所有的RabbitMQ内置功能,主命令是rabbitmqctl ,下边是一个查询用户列表的命令,注意需要切换到sbin目录下执行:
为了方便的使用RabbitMQ Control工具,我们最好添加一个环境变量,Windows默认安装时在PATH中添加一条: C:Program FilesRabbitMQ Server abbitmq_server-3.7.14sbin ,不是默认安装的话找到对应的安装目录添加PATH。按照上的安装方法,Centos可以直接使用RabbitMQ Control工具,不需要多余的配置。如果想详细了解RabbitMQ Control工具,可以参考RabbitMQ Control的官方文档。
这里总结了一些最常用到的RabbitMQ Controll命令,有兴趣的小伙伴可以试着运行一下这些命令,如在命令行工具中使用命令 rabbitmqctl add_user <username> <password> 添加一个新用户。
1 基本控制命令
基本控制命令主要用于启动、停止应用程序、runtime等
#停止rabbitmq和runtime rabbitmqctl shutdown #停止erlang节点 rabbitmqctl stop #启用rabbitmq rabbitmqctl start_app #停止rabbitmq rabbitmqctl stop_app #查看状态 rabbitmqctl status #查看环境 rabbitmqctl environment #rabbitmq恢复最初状态,内部的exchange和queue都清除 rabbitmqctl reset
2 服务状态管理
这些命令主要用于用于查看exchang、channel、binding、queue、consumers:
#返回queue的信息 list_queues [-p <vhostpath>] [<queueinfoitem> ...] #返回exchange的信息 list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...] #返回绑定信息 list_bindings [-p <vhostpath>] [<bindinginfoitem> ...] #返回链接信息 list_connections [<connectioninfoitem> ...] #返回目前所有的channels list_channels [<channelinfoitem> ...] #返回consumers list_consumers [-p <vhostpath>]
3 用户管理命令
这些命令主要用于添加、修改、删除用户及管理用户权限
#在rabbitmq的内部数据库添加用户 add_user <username> <password> #删除一个用户 delete_user <username> #改变用户密码 change_password <username> <newpassword> #清除用户密码,禁止用户登录 clear_password <username> #设置用户tags,就是设置用户角色 set_user_tags <username> <tag> # 查看用户列表 list_users #创建一个vhost add_vhost <vhostpath> #删除一个vhosts delete_vhost <vhostpath> #列出vhosts list_vhosts [<vhostinfoitem> ...] #针对一个vhosts 给用户赋予相关权限 set_permissions [-p <vhostpath>] <user> <conf> <write> <read> #清除一个用户对vhost的权限 clear_permissions [-p <vhostpath>] <username> #列出所有用户对某一vhost的权限 list_permissions [-p <vhostpath>] #列出某用户的访问权限 list_user_permissions <username>
4 集群管理命令
#clusternode表示node名称,--ram表示node以ram node加入集群中。默认node以disc node加入集群,在一个node加入cluster之前,必须先停止该node的rabbitmq应用,即先执行stop_app。 join_cluster <clusternode> [--ram] #显示cluster中的所有node cluster_status #设置集群名字 set_cluster_name <clustername> #修改集群名字 rename_cluster_node <oldname> <newname> #改变一个cluster中node的模式,该节点在转换前必须先停止,不能把一个集群中唯一的disk node转化为ram node change_cluster_node_type <disc | ram> #远程删除一个节点,删除前必须该节点必须先停止 rabbitmqctl forget_cluster_node rabbit@rabbit1 #同步镜像队列 sync_queue <queuename> #取消同步队列 cancel_sync_queue <queuename> #清空队列中所有消息 purge_queue [-p vhost] <queuename>
这里列举的很多命令是现阶段用不到的,如集群控制相关的命令,这些命令的用法会在以后的章节中逐渐理解。
3 C#驱动RabbitMQ
1 一个简单的栗子
作为开发者,我们最在意的还是怎么在代码中使用RabbitMQ,可以通过官方RabbitMQ开发文档来学习RabbitMQ的使用,这里以.NET为例演示一下RabbitMQ的最基本用法。创建两个Console应用,一个作为发送消息的生产者(Producer),一个作为接受消息的消费者(Consumer),生产者向队列写入消息,消费者接受这条消息,结构如下:
两个控制台应用都要添加RabbitMQ.Client包,命令如下:
Install-Package RabbitMQ.Client
生产者(Producer)代码:
class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在设备ip,这里就是本机 HostName = "127.0.0.1", UserName = "wyy",//用户名 Password = "123321"//密码 }; //第一步:创建连接connection using (var connection = factory.CreateConnection()) { //第二步:创建通道channel using (var channel = connection.CreateModel()) { //第三步:声明交换机exchang channel.ExchangeDeclare(exchange: "myexchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //第四步:声明队列queue channel.QueueDeclare(queue: "myqueue", durable: true, exclusive: false, autoDelete: false, arguments: null); Console.WriteLine("生产者准备就绪...."); //第五步:绑定队列到交互机 channel.QueueBind(queue:"myqueue", exchange:"myexchange", routingKey:"mykey"); string message = ""; //第六步:发送消息 //在控制台输入消息,按enter键发送消息 while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); //基本发布 channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: null, body: body); Console.WriteLine($"消息【{message}】已发送到队列"); } } } Console.ReadKey(); } }
消费者(Consumer)代码:
class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在设备ip,这里就是本机 HostName = "127.0.0.1", UserName = "wyy",//用户名 Password = "123321"//密码 }; //第一步:创建连接connection using (var connection = factory.CreateConnection()) { //第二步:创建通道channel using (var channel = connection.CreateModel()) { //第三步:声明队列queue channel.QueueDeclare(queue: "myqueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //第四步:定义消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"接受到消息【{message}】"); }; Console.WriteLine("消费者准备就绪...."); //第五步:处理消息 channel.BasicConsume(queue: "myqueue", autoAck: true, consumer: consumer); Console.ReadLine(); } } } }
依次运行Producer和Consumer两个应用程序,运行结果如下:
注意:上边的代码在生产者和消费者的代码中都声明了exchange和queue,这主要是为了让这两个程序可以按任意顺序启动,如:我们只在生产者代码中定义了exchange和queue,却先启动消费者,这会让造成消费者找不到自己需要的exhange和queue(出现404错误)。实际开发中创建exchange/queue、绑定队列以及设置routingKey这些工作,都可以通过WebUI管理界面或者使用Rabbitmq Control工具完成。
QueueDeclare方法用于声明队列,ExchangeDeclare用于声明交换机,我们在使用这两个方法声明时,可以设置队列和交换机的属性,如queue的名字,长度限制,exchange是否持久化、交换机类型等。
2 QueueDeclare方法详解
在上边的栗子中我们使用了声明队列的方法 QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments) ,该方法通过参数设置队列的特性。这里介绍一下该方法 中几个参数的作用,先看一个完整的声明队列的栗子:
//声明队列newsQueue channel.QueueDeclare(queue: "myqueue", durable: false, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>() { //队列中消息的过期时间是8s { "x-message-ttl",1000*8 }, //队列60s没有被使用,则删除该队列 {"x-expires",1000*60 }, //队列最多保存100条消息 {"x-max-length",100 }, //队列中ready类型消息总共不能超过1000字节 {"x-max-length-bytes",1000 }, //当队列消息满了时,丢弃传来后续消息 {"x-overflow","reject-publish" }, //丢弃的消息发送到deadExchange交换机 {"x-dead-letter-exchange","deadExchange" }, //丢弃的消息发送到deadExchange交换机时的RoutingKey {"x-dead-letter-routing-key","deadKey" }, //队列中最大的优先级等级为10(在Publish消息时对每条消息设置优先级) {"x-max-priority",10 }, //设置队列默认为lazy {"x-queue-mode","lazy" } });
QueueDeclare方法的参数如下:
queue:队列名字;
durable:是否持久化。设置为true时,队列信息保存在rabbitmq的内置数据库中,服务器重启时队列也会恢复(注意:重启后队列内部的消息不会恢复,怎么实现消息持久化以后会详细介绍);
exclusive:是否排外。设置为true时只有首次声明该队列的Connection可以访问,其他Connection不能访问该队列;且在Connection断开时,队列会被删除(即使durable设置为true也会被删除);
autoDelete:是否自动删除。设置为true时,表示在最后一条使用该队列的连接(Connection)断开时,将自动删除这个队列;
arguments:设置队列的一些其它属性,为Dictionary<string,object>类型,下表总结了arguments中可以设置的常用属性。
参数名 | 作用 | 示例 |
Message TTL | 设置队列中消息的有效时间 | { "x-message-ttl",1000*8 },设置队列中的所有消息的有效期为8s; |
Auto expire | 自动删除队列。一定的时间内队列没有被使用,则自动删除队列 | {"x-expires",1000*60 },设置队列的过期时长为60s,如果60s没有队列被访问,则删除队列; |
Max length | 队列能保存消息的最大条数 | {"x-max-length",100 },设置队列最多保存100条消息; |
Max length bytes | 队列中ready类型消息的总字节数 | {"x-max-length-bytes",1000 }, 设置队列中ready类型消息总共不能超过1000字节; |
Overflow behaviour | 当队列消息满了时,再接收消息时的处理方法。有两种处理方案:默认为"drop-head"模式,表示从队列头部丢弃消息;"reject-publish "表示不接收后续的消息 |
{"x-overflow","reject-publish" },设置当队列消息满了时,丢弃传来后续消息; |
Dead letter exchange | 用于存储被丢弃的消息的交换机名。Overflow behaviour 的两种处理方案中丢弃的消息都会发送到这个交换机 | {"x-dead-letter-exchange","beiyongExchange" },设置丢弃的消息发送到名字位beiyongExchange的交换机; |
Dead letter routing key | 被丢弃的消息发送到Dead letter exchange时的使用的routing Key | {"x-dead-letter-routing-key","deadKey" },设置丢弃的消息发送到beiyongExchange交换机时的RoutingKey值是"deadKey"; |
Maximum priority | 设置队列中消息优先级的最大等级,在publish消息时可以设置单条消息的优先级等级 | {"x-max-priority",10 },设置中消息优先级的最大等级为10; |
Lazy mode | 设置队列的模式,如果设置为Lazy表示队列中消息尽可能存放在磁盘中,以减少内存占用;不设置时消息都存放在队列中,用以尽可能快的处理消息 | {"x-queue-mode","lazy"},3.6以后版本可用,设置队列中消息尽可能存放在磁盘中,以减少内存占用。在消息拥堵时和消息持久化配置使用可以减少内存占用。 |
3 ExchangeDeclare方法详解
声明交换机的方法 void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments) 可以设置交换机的特性,这里简单介绍一下这个方法的几个参数:
channel.ExchangeDeclare(exchange: "myexchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: new Dictionary<string, object> {
{"alternate-exchange","BeiyongExchange" }//如果消息不能路由到该交换机,就把消息路由到备用交换机BeiyongExchange上
});
exchange:交换机名字。
type:交换机类型。exchange有direct、fanout、topic、header四种类型,在下一篇会详细介绍;
durable:是否持久化。设置为true时,交换机信息保存在rabbitmq的内置数据库中,服务器重启时交换机信息也会恢复;
autoDelete:是否自动删除。设置为true时,表示在最后一条使用该交换机的连接(Connection)断开时,自动删除这个exchange;
arguments:其他的一些参数,类型为Dictionary<string,object> 。
小结
本节主要介绍了RabbitMQ的基本概念,在Windows和Centos上的安装方法,及RabbitMQ Control工具的基本使用,最后演示了一个C#驱动RabbitMQ的栗子,并详细介绍了声明queue和exchange的方法。通过这一节我们大概了解了RabbitMQ的基本使用。以后的章节会逐渐介绍RabbitMQ的四种exchange、两种Consumer的特点和使用场景,以及消息确认、优先级、持久化等,最后搭建一个高可用的RabbitMQ集群,如果文中有错误的话,希望大家可以指出,我会及时修改,谢谢!
参考文章