DotNet的MQTT实施
在DotNet上实施MQTT的更高层次的软件包。Es ist sogar .NET-Standard 2.0版本:
GitHub
Hier etwas Doku:
制备
以下代码显示了如何使用MqttFactory以最简单的方式创建新的MQTT客户端。
// Create a new MQTT client. var factory = new MqttFactory(); var mqttClient = factory.CreateMqttClient();
客户选择
MQTT客户端的所有选项都捆绑在一个名为MqttClientOptions的类中。可以通过属性手动在代码中填充选项,但建议使用MqttClientOptionsBuilder。此类提供了一个流畅的API,并通过提供一些重载和帮助程序方法来轻松设置选项。以下代码显示了如何将构建器与几个随机选项一起使用。
// Create TCP based options using the builder. var options = new MqttClientOptionsBuilder() .WithClientId("Client1") .WithTcpServer("broker.hivemq.com") .WithCredentials("bud", "%spencer%") .WithTls() .WithCleanSession() .Build();
TCP连接
以下代码显示了如何设置MQTT客户端的选项以利用基于TCP的连接。
// Use TCP connection. var options = new MqttClientOptionsBuilder() .WithTcpServer("broker.hivemq.com", 1883) // Port is optional .Build();
安全的TCP连接
以下代码显示了如何使用TLS安全的TCP连接(属性仅设置为参考):
// Use secure TCP connection. var options = new MqttClientOptionsBuilder() .WithTcpServer("broker.hivemq.com") .WithTls() .Build();
处理特殊证书
为了处理特殊的证书错误,可以使用特殊的验证回调(.NET Framework和netstandard)。对于UWP应用,可以使用属性。
// For .NET Framwork & netstandard apps: MqttTcpChannel.CustomCertificateValidationCallback = (x509Certificate, x509Chain, sslPolicyErrors, mqttClientTcpOptions) => { if (mqttClientTcpOptions.Server == "server_with_revoked_cert") { return true; } return false; }; // For UWP apps: MqttTcpChannel.CustomIgnorableServerCertificateErrorsResolver = o => { if (o.Server == "server_with_revoked_cert") { return new []{ ChainValidationResult.Revoked }; } return new ChainValidationResult[0]; };
WebSocket连接
为了使用WebSocket通信通道,需要以下代码。
// Use WebSocket connection. var options = new MqttClientOptionsBuilder() .WithWebSocketServer("broker.hivemq.com:8000/mqtt") .Build();
也可以通过调用UseTls()方法来使用安全的Web套接字连接,该方法会将协议从ws://切换到wss://。通常,需要子协议,该子协议可以直接添加到URI或专用属性中。
连接中
设置MQTT客户端选项后,可以建立连接。以下代码显示了如何与服务器连接。
// Use WebSocket connection. var options = new MqttClientOptionsBuilder() .WithWebSocketServer("broker.hivemq.com:8000/mqtt") .Build(); await client.ConnectAsync(options);
重新连接
如果与服务器的连接丢失,则会触发Disconnected事件。如果由于无法访问服务器等导致对ConnectAsync的调用失败,也会触发该事件。这允许仅调用ConnectAsync方法一次,并通过使用Disconnected事件来处理重试等。如果重新连接失败,则再次触发Disconnected事件。下面的代码显示了如何设置此行为,包括短暂的延迟。
mqttClient.Disconnected += async (s, e) => { Console.WriteLine("### DISCONNECTED FROM SERVER ###"); await Task.Delay(TimeSpan.FromSeconds(5)); try { await mqttClient.ConnectAsync(options); } catch { Console.WriteLine("### RECONNECTING FAILED ###"); } };
消费信息
以下代码显示了如何处理传入消息:
mqttClient.ApplicationMessageReceived += (s, e) => { Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}"); Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}"); Console.WriteLine(); };
订阅主题
一旦与服务器建立连接,就可以订阅主题。以下代码显示了MQTT客户端连接后如何订阅主题。
client.Connected += async (s, e) => { Console.WriteLine("### CONNECTED WITH SERVER ###"); // Subscribe to a topic await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build()); Console.WriteLine("### SUBSCRIBED ###"); };
发布消息
可以直接使用属性或使用MqttApplicationMessageBuilder创建应用程序消息。此类具有一些有用的重载,可以轻松处理不同的有效负载格式。构建器的API是流畅的API。以下代码显示了如何编写应用程序消息并发布它们:
var message = new MqttApplicationMessageBuilder() .WithTopic("MyTopic") .WithPayload("Hello World") .WithExactlyOnceQoS() .WithRetainFlag() .Build(); await client.PublishAsync(message);
不需要填写应用程序消息的所有属性。以下代码显示了如何创建非常基本的应用程序消息:
var message = new MqttApplicationMessageBuilder() .WithTopic("/MQTTnet/is/awesome") .Build();
RPC呼叫
MQTTnet.Extensions.Rpc扩展名(以nuget形式提供)允许发送请求并等待匹配的答复。这是通过定义一个模式来完成的,该模式使用主题将请求和响应相关联。根据客户端使用情况,可以定义超时。以下代码显示了如何发送RPC调用。
var rpcClient = new MqttRpcClient(_mqttClient); var timeout = TimeSpan.FromSeconds(5); var qos = MqttQualityOfServiceLevel.AtMostOnce; var response = await rpcClient.ExecuteAsync(timeout, "myMethod", payload, qos);
响应请求的设备(Arduino,ESP8266等)需要解析并回复主题。以下代码显示了如何在C中实现处理程序。
// If using the MQTT client PubSubClient it must be ensured that the request topic for each method is subscribed like the following. _mqttClient.subscribe("MQTTnet.RPC/+/ping"); _mqttClient.subscribe("MQTTnet.RPC/+/do_something"); // It is not allowed to change the structure of the topic. Otherwise RPC will not work. So method names can be separated using // an _ or . but no +, # or . If it is required to distinguish between devices own rules can be defined like the following. _mqttClient.subscribe("MQTTnet.RPC/+/deviceA.ping"); _mqttClient.subscribe("MQTTnet.RPC/+/deviceB.ping"); _mqttClient.subscribe("MQTTnet.RPC/+/deviceC.getTemperature"); // Within the callback of the MQTT client the topic must be checked if it belongs to MQTTnet RPC. The following code shows one // possible way of doing this. void mqtt_Callback(char *topic, byte *payload, unsigned int payloadLength) { String topicString = String(topic); if (topicString.startsWith("MQTTnet.RPC/")) { String responseTopic = topicString + String("/response"); if (topicString.endsWith("/deviceA.ping")) { mqtt_publish(responseTopic, "pong", false); return; } } } // Important notes: // ! Do not send response message with the _retain_ flag set to true. // ! All required data for a RPC call and the result must be placed into the payload.