什么是MQTT
MQTT(message queuing telemetry transport)是IBM开发的即时通讯协议,是一种发布/订阅极其轻量级的消息传输协议,专门为网络受限设备、低宽带以及高延迟和不可靠的网络而设计的。由于以上轻量级的特点,是实现智能家居的首选传输协议,相比于XMPP,更加轻量级而且占用宽带低。简单来说HQTT是一种通信协议,要实现发布/订阅就必须遵循这个协议。
二、实现MQTT通讯协议.NET开源库有哪些?
MQTTnet、MqttDotNet、nMQTT、M2MQTT等,这里我们使用MQTTnet(但MQTTnet搜到的教程基本都是2.7及以下版本的,我们使用的是3.0.9版本)
三、展示MQTT实现效果图
例:客户端1只要订阅了positon主题,客户端2、客户端3、客户端4.....同样订阅了position主题则他们之间就能共享position主题的所发的内容了。
如果客户端1订阅了position主题,客户端2订阅了beautiful主题,1发给消息2是收不到的。
四、创建.NETCore项目(Server和Client)
五、服务器
添加Nuget包:安装MQTTnet
class Program
{
public static IMqttServer mqttServer;
static void Main(string[] args)
{
StartMqttServer();
}
//启动Mqtt服务器
private static async void StartMqttServer()
{
try
{
//验证客户端信息
var options = new MqttServerOptions
{
//连接验证
ConnectionValidator = new MqttServerConnectionValidatorDelegate(p =>
{
if (p.ClientId == "SpecialClient")
{
if (p.Username != "USER" || p.Password != "PASS")
{
p.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
}
}
})
};
//设置端口号
options.DefaultEndpointOptions.Port = 8031;
//创建Mqtt服务器
mqttServer = new MqttFactory().CreateMqttServer();
//开启订阅事件
mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(MqttNetServer_SubscribedTopic);
//取消订阅事件
mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(MqttNetServer_UnSubscribedTopic);
//客户端消息事件
mqttServer.UseApplicationMessageReceivedHandler(MqttServe_ApplicationMessageReceived);
//客户端连接事件
mqttServer.UseClientConnectedHandler(MqttNetServer_ClientConnected);
//客户端断开事件
mqttServer.UseClientDisconnectedHandler(MqttNetServer_ClientDisConnected);
//启动服务器
await mqttServer.StartAsync(options);
Console.WriteLine("服务器启动成功!输入任意内容并回车停止服务!");
Console.ReadLine();
await mqttServer.StopAsync();
}
catch (Exception e)
{
Console.Write($"服务器启动失败 Msg:{e}");
}
}
/// <summary>
/// 客户订阅
/// </summary>
private static void MqttNetServer_SubscribedTopic(MqttServerClientSubscribedTopicEventArgs e)
{
//客户端Id
var ClientId = e.ClientId;
var Topic = e.TopicFilter.Topic;
Console.WriteLine($"客户端[{ClientId}]已订阅主题:{Topic}");
}
/// <summary>
/// 客户取消订阅
/// </summary>
private static void MqttNetServer_UnSubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e)
{
//客户端Id
var ClientId = e.ClientId;
var Topic = e.TopicFilter;
Console.WriteLine($"客户端[{ClientId}]已取消订阅主题:{Topic}");
}
/// <summary>
/// 接收消息
/// </summary>
private static void MqttServe_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
var ClientId = e.ClientId;
var Topic = e.ApplicationMessage.Topic;
var Payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
var Qos = e.ApplicationMessage.QualityOfServiceLevel;
var Retain = e.ApplicationMessage.Retain;
Console.WriteLine($"客户端[{ClientId}]>> 主题:[{Topic}] 负载:[{Payload}] Qos:[{Qos}] 保留:[{Retain}]");
}
/// <summary>
/// 客户连接
/// </summary>
private static void MqttNetServer_ClientConnected(MqttServerClientConnectedEventArgs e)
{
var ClientId = e.ClientId;
Console.WriteLine($"客户端[{ClientId}]已连接");
}
/// <summary>
/// 客户连接断开
/// </summary>
private static void MqttNetServer_ClientDisConnected(MqttServerClientDisconnectedEventArgs e)
{
var ClientId = e.ClientId;
Console.WriteLine($"客户端[{ClientId}]已断开连接");
}
}
六、客户端
也要添加Nuget包:安装MQTTnet
public static IMqttClient mqttClient;
static void Main(string[] args)
{
ConnectMqttServerAsync();
ImportData();
}
private static async void ConnectMqttServerAsync()
{
try
{
var factory = new MqttFactory();
mqttClient = factory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithTcpServer("127.0.0.1", 8031)
.WithCredentials("test", "test")
.WithClientId(Guid.NewGuid().ToString().Substring(0, 5))
.Build();
//消息
mqttClient.UseApplicationMessageReceivedHandler(e =>
{
Console.WriteLine("### 收到的信息 ###");
Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");//主题
Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");//页面信息
Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");//消息等级
Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");//是否保留
Console.WriteLine();
});
//重连机制
mqttClient.UseDisconnectedHandler(async e =>
{
Console.WriteLine("与服务器断开连接!");
await Task.Delay(TimeSpan.FromSeconds(5));
try
{
await mqttClient.ConnectAsync(options);
}
catch (Exception exp)
{
Console.Write($"重新连接服务器失败 Msg:{exp}");
}
});
await mqttClient.ConnectAsync(options);
Console.Write("连接服务器成功!输入任意内容并回车进入菜单页面!");
}
catch (Exception exp)
{
Console.Write($"连接服务器失败 Msg:{exp}");
}
}
private static void ImportData()
{
Console.ReadLine();
bool isExit = false;
while (!isExit)
{
Console.WriteLine(@"请输入
1.订阅主题
2.取消订阅
3.发送消息
4.退出");
var input = Console.ReadLine();
switch (input)
{
case "1":
Console.WriteLine(@"请输入主题名称:");
var topicName = Console.ReadLine();
Subscribe(topicName);
break;
case "2":
Console.WriteLine(@"请输入需要取消订阅主题名称:");
topicName = Console.ReadLine();
Unsubscribe(topicName);
break;
case "3":
Console.WriteLine("请输入需要发送的主题名称");
topicName = Console.ReadLine();
Console.WriteLine("请输入需要发送的消息");
var message = Console.ReadLine();
Publish(topicName, message);
break;
case "4":
isExit = true;
break;
default:
Console.WriteLine("请输入正确指令!");
break;
}
}
}
/// <summary>
/// 订阅
/// </summary>
/// <param name="topicName"></param>
private static async void Subscribe(string topicName)
{
string topic = topicName.Trim();
if (string.IsNullOrEmpty(topic))
{
Console.Write("订阅主题不能为空!");
return;
}
if (!mqttClient.IsConnected)
{
Console.Write("MQTT客户端尚未连接!");
return;
}
await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build());
}
/// <summary>
/// 取消订阅
/// </summary>
/// <param name="topicName"></param>
private static async void Unsubscribe(string topicName)
{
string topic = topicName.Trim();
if (string.IsNullOrEmpty(topic))
{
Console.Write("订阅主题不能为空!");
return;
}
if (!mqttClient.IsConnected)
{
Console.Write("MQTT客户端尚未连接!");
return;
}
await mqttClient.UnsubscribeAsync(topic);
}
/// <summary>
/// 发送消息
/// </summary>
/// <param name="message"></param>
private static async void Publish(string topicName, string message)
{
string topic = topicName.Trim();
string msg = message.Trim();
if (string.IsNullOrEmpty(topic))
{
Console.Write("主题不能为空!");
return;
}
if (!mqttClient.IsConnected)
{
Console.Write("MQTT客户端尚未连接!");
return;
}
var MessageBuilder = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(msg)
.WithExactlyOnceQoS()
.WithRetainFlag()
.Build();
await mqttClient.PublishAsync(MessageBuilder);
}
源代码:
链接:https://pan.baidu.com/s/1rxTBZHHAmkDVcO6XmJPXng
提取码:05qr