zoukankan      html  css  js  c++  java
  • .Net5开发MQTT服务器

    .Net5开发MQTT服务器主要借助MQTTnet包,自主开发MQTT服务器,经测试,非常稳定。

     

      

    using IoT;
    using JieYun.Admin.Net5;
    using JieYun.IoT.Common.Models;
    using JieYun.IoT.Server.Services;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;
    using MQTTnet;
    using MQTTnet.Protocol;
    using MQTTnet.Server;
    using System;
    using System.Text;
    using System.Text.Json;
    using System.Threading;
    using System.Threading.Tasks;
    using static IoT.IoTRpc;
    
    namespace JieYun.IoT.Server
    {
        public class ServerWorker : BackgroundService
        {
            public static IMqttServer mqttServer;
    
            private readonly ILogger<ServerWorker> _logger;
            private readonly IoTRpcClient _client;
    
            public ServerWorker(ILogger<ServerWorker> logger, IoTRpcClient client, IServiceProvider provider)
            {
                _logger = logger;
                _client = client;
            }
    
            protected override async Task ExecuteAsync(CancellationToken stoppingToken)
            {
                await StartMqttServer();
            }
    
            //启动Mqtt服务器
            private async Task StartMqttServer()
            {
                try
                {
                    //验证客户端信息
                    string hostIp = AppConfigProvider.AppConfig.IoTServerAddress;//IP地址
                    int hostPort = AppConfigProvider.AppConfig.IoTServerPort;//端口号
                    int timeout = 5;//超时时间
                    string username = "admin";//用户名
                    string password = "admin";//密码
    
                    var optionBuilder = new MqttServerOptionsBuilder()
                      // .WithDefaultEndpointBoundIPAddress(System.Net.IPAddress.Parse(hostIp))
                       .WithDefaultEndpointPort(hostPort)
                       .WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(timeout))
                       .WithConnectionValidator(t =>
                       {
                           if (t.Username != username || t.Password != password)
                           {
                               t.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.BadUserNameOrPassword;
                           }
                           t.ReasonCode = MqttConnectReasonCode.Success;
                       });
                    var options = optionBuilder.Build();
    
                    //创建Mqtt服务器
                    mqttServer = new MqttFactory().CreateMqttServer();
    
                    //开启订阅事件
                    mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(MqttNetServer_SubscribedTopic);
    
                    //取消订阅事件
                    mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(MqttNetServer_UnSubscribedTopic);
    
                    //客户端消息事件
                    mqttServer.UseApplicationMessageReceivedHandler(MqttServe_ApplicationMessageReceivedAsync);
    
                    //客户端连接事件
                    mqttServer.UseClientConnectedHandler(MqttNetServer_ClientConnected);
    
                    //客户端断开事件
                    mqttServer.UseClientDisconnectedHandler(MqttNetServer_ClientDisConnected);
    
                    //启动服务器
                    await mqttServer.StartAsync(options);
    
                    _logger.LogInformation("MQTT服务器已启动.");
                }
                catch (Exception e)
                {
                    _logger.LogError($"MQTT服务启动失败:{e}");
                }
            }
    
            /// <summary>
            /// 客户订阅
            /// </summary>
            private void MqttNetServer_SubscribedTopic(MqttServerClientSubscribedTopicEventArgs e)
            {
                //客户端Id
                var ClientId = e.ClientId;
                var Topic = e.TopicFilter.Topic;
                _logger.LogInformation($"客户端【{ClientId}】订阅:{Topic}");
            }
    
            /// <summary>
            /// 客户取消订阅
            /// </summary>
            private void MqttNetServer_UnSubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e)
            {
                //客户端Id
                var ClientId = e.ClientId;
                var Topic = e.TopicFilter;
                _logger.LogInformation($"客户端【{ClientId}】取消订阅:{Topic}");
            }
    
            /// <summary>
            /// 接收消息
            /// </summary>
            private async Task MqttServe_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
            {
                _logger.LogInformation(e.ApplicationMessage.ToConsoleMessage());
    
                //转发消息到WebClient
                var msgStr = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
                var msg = JsonSerializer.Deserialize<MQTTMessage>(msgStr);
    
                if(msg.To == "SERVER"&&e.ApplicationMessage.Topic == MQTTTOPIC.UPDATE)
                {
                    var msgSend = new MQTTMessage()
                    {
                        From = msg.From,
                        To = "WebClient",
                        Msg = msg.Msg
                    };
                    await SendMessageToWebClient(msgSend);
                }
            }
    
            /// <summary>
            /// 客户连接
            /// </summary>
            private async Task MqttNetServer_ClientConnected(MqttServerClientConnectedEventArgs e)
            {
                var ClientId = e.ClientId;
                _logger.LogInformation($"{DateTime.Now} 客户端【{ClientId}】已连接");
               
                //通知服务器,客户端连接了
                var msg = new MQTTMessage()
                {
                    From = ClientId,
                    To = "SERVER",
                    Msg = ClientStatus.Connected
                };
                await SendMessageToWebClient(msg);
            }
    
            /// <summary>
            /// 客户连接断开
            /// </summary>
            private async Task MqttNetServer_ClientDisConnected(MqttServerClientDisconnectedEventArgs e)
            {
                var ClientId = e.ClientId;
                
                _logger.LogInformation($"{DateTime.Now} 客户端【{ClientId}】已断开");
    
                //通知服务器,客户端断开了
                var msg = new MQTTMessage()
                {
                    From = ClientId,
                    To = "SERVER",
                    Msg = ClientStatus.Disconnected
                };
                await SendMessageToWebClient(msg);
            }
    
            private async Task SendMessageToWebClient(MQTTMessage msg)
            {
                var msgStr = JsonSerializer.Serialize(msg);
                var payload = Encoding.UTF8.GetBytes(msgStr);
                MqttApplicationMessage mm = new MqttApplicationMessage()
                {
                    Topic = MQTTTOPIC.UPDATE,
                    Payload = payload
                };
                await mqttServer.PublishAsync(mm);
            }
        }
    }
    消息类MQTTMessage
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Text.Json;
    
    namespace JieYun.IoT.Common.Models
    {
        public class MQTTMessage
        {
            public string Msg { get; set; }
            public string From { get; set; } = "SERVER";
            public string To { get; set; } = "e098060e71ef";
    
            public override string ToString()
            {
                return JsonSerializer.Serialize(this);
            }
        }
    }
    using Microsoft.AspNetCore.Hosting;
    using Microsoft.Extensions.Configuration;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;
    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    using System.Threading.Tasks;
    
    namespace JieYun.IoT.Server
    {
        public class Program
        {
            public static void Main(string[] args)
            {
                var config = new ConfigurationBuilder()
                .AddCommandLine(args)
                .Build();
    
    
                var host = new WebHostBuilder()
                  .UseConfiguration(config)
                  .UseKestrel()
                  .UseContentRoot(Directory.GetCurrentDirectory())
                  .UseStartup<Startup>()
                  .ConfigureServices((hostContext, services) =>
                    {
                        services.AddHostedService<ServerWorker>();
                    })
                  .ConfigureLogging(logging => {
                      logging.ClearProviders();
                      logging.SetMinimumLevel(LogLevel.Trace);
                      logging.AddConsole();
                  })
                  .Build();
    
                host.Run();
            }
        }
    }
  • 相关阅读:
    ios 一个正则表达式测试(只可输入中文、字母和数字)
    IOS7 8中tableview分割线缺少15像素
    Java中使用OpenSSL生成的RSA公私钥进行数据加解密
    java与IOS之间的RSA加解密
    ios下使用rsa算法与php进行加解密通讯
    C# 32位md5
    [原]命令模式在MVC框架中的应用
    [原]【推荐】程序员必读的三十本经典巨作
    [原]容器学习(二):动手模拟AOP
    [原]容器学习(一):动手模拟spring的IoC
  • 原文地址:https://www.cnblogs.com/YrRoom/p/14164418.html
Copyright © 2011-2022 走看看