一、项目示例
1、appsettings.json
"Service": { "RabbitMQ": { "Enabled": true, "Conn": "host=47.94.211.196;virtualHost=/;username=admin;password=admin;timeout=60" } }
2、读取配置文件
这里只是简单的说明下,部分代码:
/// <summary> /// RabbitMQ 启用 /// </summary> public static bool? RabbitMQ_Enabled = Appsettings.GetApp(new string[] { "Service", "RabbitMQ", "Enabled" })?.ToBool(); /// <summary> /// RabbitMQ 虚拟主机 /// </summary> public static string RabbitMQ_Conn = Appsettings.GetApp(new string[] { "Service", "RabbitMQ", "Conn" });
3、EasyNetQ服务
这里使用的是EasyNetQ中间件,源码已经在github中公开,主要提供了对RabbitMQ操作的API。项目引入EasyNetQ:
4、Startup中添加RabbitMQ服务
扩展方法:
//rabbitmq消息队列 services.AddRabbitMQService();
具体的扩展方法:
namespace CommonCore.Helpers { static partial class Extention { public static IServiceCollection AddRabbitMQService(this IServiceCollection services) { if (services == null) throw new ArgumentNullException(nameof(services)); if ((bool)AppsettingsMap.RabbitMQ_Enabled) { services.AddSingleton(RabbitHutch.CreateBus(AppsettingsMap.RabbitMQ_Conn)); } return services; } } }
5、注入
using EasyNetQ;
using EasyNetQ.Topology;
public class BaseProvider { public BaseProvider(IBus bus, ILogger logger) { _logger = logger; _bus = bus; } private readonly IBus _bus; protected ILogger _logger;
}
IExchange exchange = await _bus.Advanced.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct);
await _bus.Advanced.PublishAsync(exchange, server, false, new MessageProperties() { DeliveryMode = 2 }, SerializeHelper.SerializeToBytes(new MessageOptions
{
}));
二、RabbitMQ管理界面
打开RabbitMQ后台界面,如上图。 右上角可以设置页面"刷新时间"。以及选择监听的"虚拟主机"。界面有"概要"、"连接"、"通道"、"分发器"、"队列"、"用户"等几个管理页签。下面进行详细介绍:
三、Overview
如下图,Overview 中包含的功能模块。Overview介绍了RabbitMQ的基本信息。
1、Totals
Totals 里面有 准备消费的消息数、待确认的消息数、消息总数以及消息的各种处理速率(发送速率、确认速率、写入硬盘速率等等)
2、Nodes
Nodes是支撑 RabbitMQ 运行的一些机器,相当于集群的节点。点击每个节点,可以查看节点的详细信息。RabbitMQ我只装在了一台服务器上,因而只能看到一个节点。
3、Churn statistics
里边展示的是 Connection、Channel 以及 Queue 的创建/关闭速率。
4、Ports and contexts
这个里边展示了端口的映射信息以及 Web 的上下文信息。
- 5672 是 RabbitMQ 通信端口。
- 15672 是 Web 管理页面端口。
- 25672 是集群通信端口。
5、Export definitions && Import definitions
这两个可以导入导出当前实例的一些配置信息:
四、Connections
这里主要展示的是当前连接上 RabbitMQ 的信息,无论是消息生产者还是消息消费者,只要连接上来了这里都会显示出来。
-
Protocal:AMQP 0-9-1 指的是 AMQP 协议的版本号。
-
User name:当前连接使用的用户名。
-
State:当前连接的状态,running 表示运行中;idle 表示空闲。
-
SSL/TLS:表示是否使用 ssl 进行连接。
-
Channels:当前连接创建的通道总数。
-
From client:每秒发出的数据包。
-
To client:每秒收到的数据包。
-
点击连接名称可以查看每一个连接的详情。在详情中可以查看每一个连接的通道数以及其他详细信息,也可以强制关闭一个连接。
五、Channels
这里展示的是通道的信息:
-
一个连接(IP)可以有多个通道,如上图,一共是两个连接,但是一共有 5 个通道。
-
一个连接可以有多个通道,这个多个通道通过多线程实现,一般情况下,我们在通道中创建队列、交换机等。
-
生产者的通道一般会立马关闭;消费者是一直监听的,通道几乎是会一直存在。
上面各项参数含义分别如下:
-
Channel:通道名称。
-
User name:该通道登录使用的用户名。
-
Model:通道确认模式,C 表示 confirm;T 表示事务。
-
State:通道当前的状态,running 表示运行中;idle 表示空闲。
-
Unconfirmed:待确认的消息总数。
-
Prefetch:Prefetch 表示每个消费者最大的能承受的未确认消息数目,简单来说就是用来指定一个消费者一次可以从 RabbitMQ 中获取多少条消息并缓存在消费者中,一旦消费者的缓冲区满了,RabbitMQ 将会停止投递新的消息到该消费者中直到它发出有消息被 ack 了。总的来说,消费者负责不断处理消息,不断 ack,然后只要 unAcked 数少于 prefetch * consumer 数目,RabbitMQ 就不断将消息投递过去。
-
Unacker:待 ack 的消息总数。
-
publish:消息生产者发送消息的速率。
-
confirm:消息生产者确认消息的速率。
-
unroutable (drop):表示未被接收,且已经删除了的消息。
-
deliver/get:消息消费者获取消息的速率。
-
ack:消息消费者 ack 消息的速率。
六、Exchanges
这个地方展示交换机信息:
-
Type 表示交换机的类型。
-
Features 有两个取值 D 和 I。D 表示交换机持久化,将交换机的属性在服务器内部保存,当 MQ 的服务器发生意外或关闭之后,重启 RabbitMQ 时不需要重新手动或执行代码去建立交换机,交换机会自动建立,相当于一直存在。I 表示这个交换机不可以被消息生产者用来推送消息,仅用来进行交换机和交换机之间的绑定。
-
Message rate in 表示消息进入的速率。 Message rate out 表示消息出去的速率。
-
点击下方的 Add a new exchange 可以创建一个新的交换机。
七、Queues
这个选项卡就是用来展示消息队列的:
-
Name:表示消息队列名称。
-
Features:表示消息队列的特性,D 表示消息队列持久化。
-
State:表示当前队列的状态,running 表示运行中;idle 表示空闲。
-
Ready:表示待消费的消息总数。
-
Unacked:表示待应答的消息总数。
-
Total:表示消息总数 Ready+Unacked。
-
incoming:表示消息进入的速率。
-
deliver/get:表示获取消息的速率。
-
ack:表示消息应答的速率。
八、Admin
这里是做一些用户管理操作,如下图:
-
Name:表示用户名称。
-
Tags:表示角色标签,只能选取一个。
-
Can access virtual hosts:表示允许进入的虚拟主机。
-
Has password:表示这个用户是否设置了密码。