Ø 简介
C# 实现消息队列的方式有很多种,比如:MSMQ、RabbitMQ、EQueue 等,本文主要介绍使用 RabbitMQ 实现消息队列的基础入门。包括如下内容:
1. 什么是消息队列?
2. 什么是 RabbitMQ?
3. 安装 RabbitMQ Server
4. RabbitMQ 的基本运用
5. 远程部署 RabbitMQ 服务
6. RabbitMQ 常用命令
1. 什么是消息队列?
消息队列 MQ(全称为 Message Queue),可实现两个应用程序之间进行通信,MQ 是生产者与消费者模型的典型代表,一端往消息队列中写入消息,另一端可以读取或者订阅队列中的消息。MQ 遵循的是 AMQP 协议(高级消息队列协议:使得遵从该规范的客户端应用和消息中间件服务器的全功能互操作成为可能)的具体实现和产品。
MQ 是一种消息中间件技术,所以它能够支持多种类型的语言开发,同时也是跨平台的通信机制,也就是说 MQ 支持将信息转化为 XML 或者 Json 等类型的数据存储到消息队列中,然后可以使用不同的语言来处理消息队列中的消息,这样就很容易的做到了信息的通信,同时也为信息的通信起到了缓冲的作用,经常会在金融项目中使用这种通信机制。
MQ 的主要作用用于提高系统的并发性,将一些不需要及时响应客户端且占用较多资源的操作放入队列,再由另外一个线程去异步处理这些队列,可极大的提高系统的并发能力。
消息传递相比较文件传递与远程调用(RPC)而言,视乎更胜一筹,因为跟平台无关,并能够很好的支持并发与异步调用,所以通常用于以下情况:
1. 对操作的实时性要求不高;
2. 缓冲(消息/数据);
3. 需要执行的任务比较耗时;
4. 存在异构系统间的整合;
2. 什么是 RabbitMQ?
RabbitMQ 是一个在 AMQP 基础上完整的,可复用的企业消息系统,它遵循 Mozilla Public License 开源协议。
RabbitMQ 是部署最广泛的开源消息代理。RabbitMQ 拥有成千上万的用户,是最受欢迎的开源消息代理之一。RabbitMQ 在全球范围内的小型初创企业和大型企业中都得到使用。
RabbitMQ 轻巧,易于在内部和云中部署。它支持多种消息传递协议。RabbitMQ 可以部署在分布式和联合配置中,以满足大规模,高可用性的要求。RabbitMQ 官方网址:https://www.rabbitmq.com/
1) RabbitMQ 的特点
支持多种消息传递协议;
分布式部署;
支持多平台部署,例如:Windows Server、Linux 等;
支持多语言运用,例如:C#、Java、PHP、Python 等;
3. 安装 RabbitMQ Server
1) 进入官网下载地址
https://www.rabbitmq.com/#getstarted
2) 点击 Download + Installation
3) 点击官网推荐的 Windows 系统的安装包
4) Windows 上安装 RabbitMQ,推荐了两个安装选项:
1. 使用 Chocolatey 安装;
Chocolatey 类似于 Nuget 软件包管理工具,这里不使用该方式安装。
2. 使用官方安装程序;
这里使用该方式进行安装,点击【Using the official installer】。
5) 这里说了,在 Windows 上运行 RabbitMQ 依赖于 Erlang 框架,所以先下载并安装它:
说明:RabbitMQ 服务是用 Erlang 语言编写的,Erlang 类似于 C# 调用的 .NET Framework 框架。
1. 点击 OTP 22.1 Windows 64-bit Binary File (264) 进行下载
2. 下载完成后进行安装(注意:需要以管理员身份安装)
默认安装在 C:Program Fileserl10.5 目录下
3. 安装完成后,设置环境变量
系统变量 -> Path -> 编辑 -> 新建,输入:C:Program Fileserl10.5in,保存即可。
4. 检查是否安装成功,运行 CMD
6) 依然是刚才的页面,下载 RabbitMQ Server
7) 下载后直接安装
默认安装在 C:Program FilesRabbitMQ Server 目录下
8) 检查是否安装成功,运行 CMD
CD 到 RabbitMQ 的 sbin 目录下,输入:CD C:Program FilesRabbitMQ Server abbitmq_server-3.8.1sbin
检查安装状态,输入:rabbitmqctl status
启用 RabbitMQ 管理工具,输入:rabbitmq-plugins enable rabbitmq_management
进入管理页面,浏览器地址栏输入:http://127.0.0.1:15672/
另外,在 Windows 服务列表中也可以找到 RabbitMQ Server
4. RabbitMQ 的基本运用
1) 首先创建两个控制台应用程序
RabbitMQProducer(生产者),用于向消息队列中发送消息;
RabbitMQConsumer(消费者),用于接收消息队列中的消息。
2) 添加 RabbitMQ.Client 客户端程序集
提示:对于 .NET Framework 4.0 的目标项目,RabbitMQ.Client 最高支持的版本是 RabbitMQ.Client.3.5.7。
3) 向消息队列中发送消息
1. C# 代码
using RabbitMQ.Client;
var factory = new ConnectionFactory(); //连接 RabbitMQ 工厂实例
factory.HostName = "localhost"; //要连接到的主机,默认为 localhost
factory.Port = 5672; //连接断开,默认为 -1(5672)
factory.UserName = "guest"; //RabbitMQ 连接用户名,默认为 guest
factory.Password = "guest"; //RabbitMQ 连接密码,默认为 guest
//创建连接对象
using (var connection = factory.CreateConnection())
{
//创建一个新的通道、会话和模型
using (var channel = connection.CreateModel())
{
/*
* 创建一个名为 myQueue1 的消息队列,如果名称相同不会重复创建,参数解释:
* 参1:myQueue1, 消息队列名称;
* 参2:false, 是否持久化,持久化的队列会存盘,服务器重启后任然存在;
* 参3:false, 是否为排他队列,排他队列表示仅对首次声明它的连接可见,并在连接断开时自动删除。这种队列适用于一个客户端同时发送和读取消息的应用场景。
* 参4:false, 是否自动删除,自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
* 参5:设置队列的其他一些参数,如 x-rnessage-ttl、x-expires、x-rnax-length、x-rnax-length-bytes、x-dead-letter-exchange、x-deadletter-routing-key、x-rnax-priority 等。
*/
channel.QueueDeclare("myQueue1", false, false, false, null);
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 1;
string message = "Hello RabbitMQ"; //消息内容
byte[] body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "myQueue1", properties, body); //发送(生产)消息
Console.WriteLine($"Send Message: {message}");
}
}
Console.ReadLine();
2. 运行以上代码,使用 rabbitmqctl list_queues 命令查看结果:
3. 也可以使用管理工具查看
4) 接收消息队列中的消息
var factory = new ConnectionFactory(); //连接 RabbitMQ 工厂实例
factory.HostName = "localhost"; //要连接到的主机,默认为 localhost
factory.Port = 5672; //连接断开,默认为 -1(5672)
factory.UserName = "guest"; //RabbitMQ 连接用户名,默认为 guest
factory.Password = "guest"; //RabbitMQ 连接密码,默认为 guest
//不能放入 using 语句中,否则当 using 语句结束后会 Close 连接,EventingBasicConsumer.Received 事件将不会被触发
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare("myQueue1", false, false, false, null);
var consumer = new EventingBasicConsumer(channel); //消费者(指定消息通道)
channel.BasicConsume("myQueue1", true, consumer); //消费消息(在当前通道中监听 myQueue1 队列,并进行消费)
//该事件在接收到消息时触发
consumer.Received += (sender, e) =>
{
byte[] body = e.Body; //消息字节数组
string message = Encoding.UTF8.GetString(body); //消息内容
Console.WriteLine($"Received: {message}, {DateTime.Now.ToString("HH: mm:ss fff")}");
};
Console.ReadLine();
connection.Close();
channel.Close();
5) 模拟频繁发送/接收消息
1. 修改代码
for (int i = 0; i < 100; i++)
{
string message = $"Hello RabbitMQ {i}";
byte[] body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "myQueue1", properties, body);
Console.WriteLine($"Send: {message}, {DateTime.Now.ToString("HH:mm:ss fff")}");
System.Threading.Thread.Sleep(1000); //间隔1秒钟发送一次
}
2. 运行效果
5. 远程部署 RabbitMQ 服务
我们之前是将 RabbitMQ 服务部署在本机,以 localhost 的主机名去访问。在实际工作通常是将 RabbitMQ 服务部署在远程的服务器上,提供给更多的客户端去访问。下面是具体的操作步骤:
1) 具体的安装步骤与本地安装一致(可以参考本地安装)
2) 开放 5672 端口(这是 RabbitMQ 服务的默认端口)
1. 打开防火墙,开放 5672 端口的访问;
2. 如果是阿里云 ECS 服务器,还需要在安全组规则开放对 5672 端口访问。
3) RabbitMQ 默认不支持 guest 用户远程访问
4) 解决办法很简单,添加一个新用户即可
1. 打开 RabbitMQ 管理工具,进入用户管理 Tab
2. 添加用户后,默认是没有权限的,点击用户名进行权限设置
3. 点击 Set permission 即可
4. 另外,也可以使用命令进行权限设置
rabbitmqctl set_permissions -p "/" abeam "." "." ".*"
6. RabbitMQ 常用命令
首先,进入 RabbitMQ 命令
CMD: CD C:Program FilesRabbitMQ Server abbitmq_server-3.8.1sbin
创建 abeam 用户并指定密码 | rabbitmqctl add_user abeam abeamli |
设置 abeam 用户读写所有队列的权限 | rabbitmqctl set_permissions abeam ".*" ".*" ".*" |
设置 abeam 用户所属的用户组 | rabbitmqctl set_user_tags abeam administrator |
查看用户列表 | rabbitmqctl list_users |
修改 abeam 用户的密码为 abeam123 | rabbitmqctl change_password abeam abeam123 |
删除 abeam 用户 | rabbitmqctl delete_user abeam |