使用MQTTnet部署MQTT服务
下载地址:https://github.com/chkr1011/MQTTnet
引用地址:https://www.cnblogs.com/zhaoqm999/p/12960677.html
一. 服务端
1. 创建配置参数
可以使用 `var options = new MqttServerOptions();` 直接构建一个options。你也可以通过参数构建器 `var options = new MqttServerOptionsBuilder();` 使代码更简洁美观。
构建器的函数说明:
函数名 | 功能说明 |
Build | 构建配置参数 |
WithApplicationMessageInterceptor | 允许处理来自客户端的所有已发布消息 |
WithClientId | 服务端发布消息时使用的ClientId |
WithConnectionBacklog | 设置要保留的连接数 |
WithConnectionValidator | 验证连接 |
WithDefaultCommunicationTimeout | 设置默认的通信超时 |
WithDefaultEndpoint | 使用默认端点 |
WithDefaultEndpointBoundIPAddress | 使用默认端点IPv4地址 |
WithDefaultEndpointBoundIPV6Address | 使用默认端点IPv6地址 |
WithDefaultEndpointPort | 使用默认端点端口 |
WithEncryptedEndpoint | 使用加密的端点 |
WithEncryptedEndpointBoundIPAddress | 使用加密的端点IPv4地址 |
WithEncryptedEndpointBoundIPV6Address | 使用加密的端点IPv6地址 |
WithEncryptedEndpointPort | 使用加密的端点端口 |
WithEncryptionCertificate | 使用证书进行SSL连接 |
WithEncryptionSslProtocol | 使用SSL协议级别 |
WithMaxPendingMessagesPerClient | 每个客户端允许最多未决消息 |
WithPersistentSessions | 保持会话 |
WithStorage | 使用存储 |
WithSubscriptionInterceptor | 允许处理来自客户端的所有订阅 |
WithoutDefaultEndpoint | 禁用默认端点 |
WithoutEncryptedEndpoint | 禁用默认(SSL)端点 |
验证账号密码
- options.WithConnectionValidator(c =>
- {
- if (c.Username != "seven")
- {
- c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
- }
- });
2. 启动服务端
- var server = new MqttFactory().CreateMqttServer();
- server.StartAsync(options.Build());
服务启动事件
- server.StartedHandler = new MqttServerStartedHandlerDelegate(Started);
- static async Task Started(EventArgs e)
- {
- Console.WriteLine("Started");
- }
服务关闭事件
- server.StoppedHandler = new MqttServerStoppedHandlerDelegate(Stopped);
- static async Task Stopped(EventArgs e)
- {
- Console.WriteLine("Stopped");
- }
客户端连接事件
- server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(Connected);
- server.UseClientConnectedHandler(c => Connected(c));
- static async Task Connected(MqttServerClientConnectedEventArgs e)
- {
- Console.WriteLine($"{e.ClientId} connected");
- }
客户端断开事件
- server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(Disconnected);
- server.UseClientDisconnectedHandler(c => Disconnected(c));
- static async Task Disconnected(MqttServerClientDisconnectedEventArgs e)
- {
- Console.WriteLine($"{e.ClientId} disconnected");
- }
客户端订阅Topic
- server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(Subscribed);
- static async Task Subscribed(MqttServerClientSubscribedTopicEventArgs e)
- {
- Console.WriteLine($"{e.ClientId} subscribed {e.TopicFilter.Topic}");
- }
客户端取消订阅Topic
- server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(c => Unsubscribed(c));
- static async Task Unsubscribed(MqttServerClientUnsubscribedTopicEventArgs e)
- {
- Console.WriteLine($"{e.ClientId} unsubscribed {e.TopicFilter}");
- }
消息接收
- server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(MessageReceived);
- server.UseApplicationMessageReceivedHandler(c => MessageReceived(c));
- static async Task MessageReceived(MqttApplicationMessageReceivedEventArgs e)
- {
- Console.WriteLine($"{e.ClientId} get {e.ApplicationMessage.Topic}");
- }
3. 操作
发布消息
- var message = new MqttApplicationMessage()
- {
- Topic = "testTopic",
- Payload = Encoding.UTF8.GetBytes("hello seven")
- };
- server.PublishAsync(message);
查询客户端状态
- var list = server.GetClientStatusAsync().Result;
获取会话信息
- var sessions = server.GetSessionStatusAsync().Result;
查询Retain的消息
- var messages = server.GetRetainedApplicationMessagesAsync().Result;
清空Retain消息
- server.ClearRetainedApplicationMessagesAsync();
停止服务
- server.StopAsync();
4. 其他
自带的日志跟踪
- var logger = new MqttNetLogger();
- logger.LogMessagePublished += LogMessagePublished;
- private static void LogMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e)
- {
- Console.WriteLine(e.LogMessage);
- Console.WriteLine(e.TraceMessage);
- }
二. 客户端
1. 创建配置参数
可以使用 `var options = new MqttClientOptions();` 直接构建一个options。你也可以通过参数构建器 `var options = new MqttClientOptionsBuilder();` 使代码更简洁美观。
构建器的函数说明:
函数名 | 功能说明 |
Build | 构建配置参数 |
WithAuthentication | 允许使用不同的身份验证模式 |
WithCleanSession | 将客户端与MQTT干净会话支持一起使用 |
WithClientId | 设置客户端ID |
WithCommunicationTimeout | 设置通信超时 |
WithCredentials | 设置登录凭证 |
WithExtendedAuthenticationExchangeHandler | 以自定义方式处理身份验证 |
WithKeepAlivePeriod | 设置保持有效期 |
WithKeepAliveSendInterval | 设置保活的发送间隔 |
WithMaximumPacketSize | 设置最大数据包大小 |
WithNoKeepAlive | 不要使用保持活动状态 |
WithProtocolVersion | 设置MQTT协议版本 |
WithProxy | 设置代理 |
WithTls | 客户端使用SSL/TLS |
WithTopicAliasMaximum | 允许最大数量的主题别名 |
WithReceiveMaximum | 允许最大数量的已接收数据包 |
WithRequestProblemInformation | 显示请求问题信息 |
WithRequestResponseInformation | 显示请求响应问题信息 |
WithSessionExpiryInterval | 一段时间后终止会话 |
WithTcpServer | 告诉客户端(通过TCP)连接到哪个MQTT代理。 |
WithWebSocketServer | 告诉客户端(通过WebSocket)连接到哪个MQTT代理 |
WithWillMessage | 告诉客户端最后一条消息将被发送。 |
WithWillDelayInterval | 告诉客户端最后一个消息得延迟间隔 |
2. 连接服务端
- var client = new MqttFactory().CreateMqttClient();
- client.ConnectAsync(options.Build());
客户端连接完成事件
- client.UseConnectedHandler(e => Connected(e));
- static async Task Connected(MqttClientConnectedEventArgs e)
- {
- Console.WriteLine($"Connected");
- }
客户端断开事件
- client.UseDisconnectedHandler(e => Disconnected(e));
- static async Task Disconnected(MqttClientDisconnectedEventArgs e)
- {
- Console.WriteLine($"Disconnected");
- }
订阅主题(必须在成功连接以后才生效)
- client.UseApplicationMessageReceivedHandler(e => MessageReceived(e));
- static async Task MessageReceived(MqttApplicationMessageReceivedEventArgs e)
- {
- Console.WriteLine($"{e.ClientId}");
- }
发布消息( 必须在成功连接以后才生效 )
- var message = new MqttApplicationMessageBuilder()
- .WithTopic("myTopic")
- .WithPayload("seven365.cn")
- .WithExactlyOnceQoS()
- .WithRetainFlag()
- .Build();
- client.PublishAsync(message, CancellationToken.None);
3. 操作
客户端重连
- client.ReconnectAsync();
客户端断开
- client.DisconnectAsync