zoukankan      html  css  js  c++  java
  • MQTTNetServer

    Preparation

    Creating a MQTT server is similar to creating a MQTT client. The following code shows the most simple way of creating a new MQTT server with a TCP endpoint which is listening at the default port 1883.

    // Start a MQTT server.
    var mqttServer = new MqttFactory().CreateMqttServer();
    await mqttServer.StartAsync(new MqttServerOptions());
    Console.WriteLine("Press any key to exit.");
    Console.ReadLine();
    await mqttServer.StopAsync();

    Setting several options for the MQTT server is possible by setting the property values of the MqttServerOptions directly or via using the MqttServerOptionsBuilder (which is recommended). The following code shows how to use the MqttServerOptionsBuilder.

    // Configure MQTT server.
    var optionsBuilder = new MqttServerOptionsBuilder()
        .WithConnectionBacklog(100)
        .WithDefaultEndpointPort(1884);
    
    var mqttServer = new MqttFactory().CreateMqttServer();
    await mqttServer.StartAsync(optionsBuilder.Build());

    Validating MQTT clients

    The following code shows how to validate an incoming MQTT client connection request:

    // Setup client validator.
    var optionsBuilder = new MqttServerOptionsBuilder()
        .WithConnectionValidator(c =>
    {
        if (c.ClientId.Length < 10)
        {
            c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
            return;
        }
    
        if (c.Username != "mySecretUser")
        {
            c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
            return;
        }
    
        if (c.Password != "mySecretPassword")
        {
            c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
            return;
        }
    
        c.ReasonCode = MqttConnectReasonCode.Success;
    });

    Using a certificate

    In order to use an encrypted connection a certificate including the private key is required. The following code shows how to start a server using a certificate for encryption:

    using System.Reflection;
    using System.Security.Authentication;
    using System.Security.Cryptography.X509Certificates;
    ...
    
    var currentPath = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location);
    var certificate = new X509Certificate2(Path.Combine(currentPath, "certificate.pfx"),"yourPassword", X509KeyStorageFlags.Exportable);
    
    var optionsBuilder = new MqttServerOptionsBuilder()
        .WithoutDefaultEndpoint() // This call disables the default unencrypted endpoint on port 1883
        .WithEncryptedEndpoint()
        .WithEncryptedEndpointPort(config.Port)
        .WithEncryptionCertificate(certificate.Export(X509ContentType.Pfx))
        .WithEncryptionSslProtocol(SslProtocols.Tls12)

    But also other overloads getting a valid certificate blob (byte array) can be used.

    For creating a self-signed certificate for testing the following command can be used (Windows SDK must be installed):

    makecert.exe -sky exchange -r -n "CN=selfsigned.crt" -pe -a sha1 -len 2048 -ss My "test.cer"

    OpenSSL can also be used to create a self-signed PFX certificate as described here.

    Example:

    openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365
    openssl pkcs12 -export -out certificate.pfx -inkey key.pem -in cert.pem
    

    Publishing messages

    The server is also able to publish MQTT application messages. The object is the same as for the client implementation. Due to the fact that the server is able to publish its own messages it is not required having a loopback connection in the same process.

    This allows also running the server in a Windows IoT Core UWP app. This platform has a network isolation which makes it impossible to communicate via localhost etc.

    Examples for publishing a message are described at the client section of this Wiki.

    Consuming messages

    The server is also able to process every application message which was published by any client. The event ApplicationMessageReceived will be fired for every processed message. It has the same format as for the client but additionally has the ClientId.

    Details for consuming a application messages are described at the client section of this Wiki.

    Saving retained application messages

    The server supports retained MQTT messages. Those messages are kept and send to clients when they connect and subscribe to them. It is also supported to save all retained messages and loading them after the server has started. This required implementing an interface. The following code shows how to serialize retained messages as JSON:

    // Setting the options
    options.Storage = new RetainedMessageHandler();
    
    // The implementation of the storage:
    // This code uses the JSON library "Newtonsoft.Json".
    public class RetainedMessageHandler : IMqttServerStorage
    {
        private const string Filename = "C:\\MQTT\\RetainedMessages.json";
    
        public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages)
        {
            File.WriteAllText(Filename, JsonConvert.SerializeObject(messages));
            return Task.FromResult(0);
        }
    
        public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync()
        {
            IList<MqttApplicationMessage> retainedMessages;
            if (File.Exists(Filename))
            {
                var json = File.ReadAllText(Filename);
                retainedMessages = JsonConvert.DeserializeObject<List<MqttApplicationMessage>>(json);
            }
            else
            {
                retainedMessages = new List<MqttApplicationMessage>();
            }
                
            return Task.FromResult(retainedMessages);
        }
    }

    Intercepting application messages

    A custom interceptor can be set at the server options. This interceptor is called for every application message which is received by the server. This allows extending application messages before they are persisted (in case of a retained message) and before being dispatched to subscribers. This allows use cases like adding a time stamp to every application message if the hardware device does not know the time or time zone etc. The following code shows how to use the interceptor:

    var optionsBuilder = new MqttServerOptionsBuilder()
        .WithApplicationMessageInterceptor(context =>
        {
            if (context.ApplicationMessage.Topic == "my/custom/topic")
            {
                context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes("The server injected payload.");
            }
    
            // It is possible to disallow the sending of messages for a certain client id like this:
            if (context.ClientId != "Someone")
            {
                context.AcceptPublish = false;
                return;
            }
            // It is also possible to read the payload and extend it. For example by adding a timestamp in a JSON document.
            // This is useful when the IoT device has no own clock and the creation time of the message might be important.
        })
        .Build();

    If you want to stop processing an application message completely (like a delete) then the property context.ApplicationMessage.Payload must be set to null.

    Intercepting subscriptions

    A custom interceptor can be set to control which topics can be subscribed by a MQTT client. This allows moving private API-Topics to a protected area which is only available for certain clients. The following code shows how to use the subscription interceptor.

    // Protect several topics from being subscribed from every client.
    var optionsBuilder = new MqttServerOptionsBuilder()
        .WithSubscriptionInterceptor(context =>
        {
            if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin")
            {
                context.AcceptSubscription = false;
            }
    
            if (context.TopicFilter.Topic.StartsWith("the/secret/stuff") && context.ClientId != "Imperator")
            {
                context.AcceptSubscription = false;
                context.CloseConnection = true;
            }
        })
        .Build();

    It is also supported to use an async method instead of a synchronized one like in the above example.

    Storing data in the session

    From version 3.0.6 and up, there is a Dictionary<object, object> called SessionItems. It allows to store custom data in the session and is available in all interceptors:

    var optionsBuilder = new MqttServerOptionsBuilder()
    .WithConnectionValidator(c => { c.SessionItems.Add("SomeData", true); }
    .WithSubscriptionInterceptor(c => { c.SessionItems.Add("YourData", new List<string>{"a", "b"}); }
    .WithApplicationMessageInterceptor(c => { c.SessionItems.Add("Test", 123); }

    ASP.NET Core Integration

    ASP.NET Core 2.0

    This library also has support for a WebSocket based server which is integrated into ASP.NET Core 2.0. This functionality requires an additional library called MQTTnet.AspNetCore. After adding this library a MQTT server can be added to a Kestrel HTTP server.

    // In class _Startup_ of the ASP.NET Core 2.0 project.
    public void ConfigureServices(IServiceCollection services)
    {
         // This adds a hosted mqtt server to the services
         services.AddHostedMqttServer(builder => builder.WithDefaultEndpointPort(1883));
    
         // This adds TCP server support based on System.Net.Socket
         services.AddMqttTcpServerAdapter();
    
         // This adds websocket support
         services.AddMqttWebSocketServerAdapter();
    }
    
    public void Configure(IApplicationBuilder app, IHostingEnvironment env)
    {
        // This maps the websocket to an MQTT endpoint
        app.UseMqttEndpoint();
        // Other stuff
    }

    ASP.NET Core 2.1+

    MQTTnet.AspNetCore is compatible with the abstractions present in ASP.NET Core 2.0 but it also offers a new TCP transport based on ASP.NET Core 2.1 Microsoft.AspNetCore.Connections.Abstractions. This transport is mutual exclusive with the old TCP transport so you may only add and use one of them. Our benchmark indicates that the new transport is up to 30 times faster.

    // In class _Program_ of the ASP.NET Core 2.1 or 2.2 project.
    private static IWebHost BuildWebHost(string[] args) =>
        WebHost.CreateDefaultBuilder(args)
            .UseKestrel(o => {
                o.ListenAnyIP(1883, l => l.UseMqtt()); // MQTT pipeline
                o.ListenAnyIP(5000); // Default HTTP pipeline
            })
        .UseStartup<Startup>()
        .Build();
    
    // In class _Startup_ of the ASP.NET Core 2.1 or 2.2 project.
    public void ConfigureServices(IServiceCollection services)
    {
         //this adds a hosted mqtt server to the services
         services.AddHostedMqttServer(builder => builder.WithDefaultEndpointPort(1883));
    
         //this adds tcp server support based on Microsoft.AspNetCore.Connections.Abstractions
         services.AddMqttConnectionHandler();
    
         //this adds websocket support
         services.AddMqttWebSocketServerAdapter();
    }

    ASP.NET Core 3.0+ (Since MQTT version 3.0.9)

    In ASP.NET Core 3.0+, the server can be configured like this. Remember, that the TLS middleware connection is not yet available, so this will only work for WebSocket connections (Check https://github.com/chkr1011/MQTTnet/issues/464).

    // In class _Program_ of the ASP.NET Core 3.0+ project.
    private static IWebHost BuildWebHost(string[] args) =>
        WebHost.CreateDefaultBuilder(args)
            .UseKestrel(o => {
                o.ListenAnyIP(1883, l => l.UseMqtt()); // MQTT pipeline
                o.ListenAnyIP(5000); // Default HTTP pipeline
            })
        .UseStartup<Startup>()
        .Build();
    
    // In class _Startup_ of the ASP.NET Core 3.0+ project.
    public void ConfigureServices(IServiceCollection services)
    {
        services
            .AddHostedMqttServer(mqttServer => mqttServer.WithoutDefaultEndpoint())
            .AddMqttConnectionHandler()
            .AddConnections();
    }
    
    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    {
        app.UseEndpoints(endpoints =>
        {
            endpoints.MapMqtt("/mqtt");
        });
    
        app.UseMqttServer(server =>
        {
            // Todo: Do something with the server
        });
    }

    Windows IoT Core and UWP localhost loopback addresses

    In Windows IoT Core as well as in UWP, loopback connections (127.0.0.1) are not allowed. If you try to connect to a locally running server (broker), this will fail. See Communicating with localhost (loopback) for enable loopback in Windows 10 IoT Core and UWP-apps.

    Special notice for using the server project in Android

    Under Android, there is an issue with the default bound IP address. So you have to use the actual address of the device. Check the example below.

    IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
    IPAddress ipAddress = ipHostInfo.AddressList[0];
    
    var server = new MqttFactory().CreateMqttServer();
    server.StartAsync(new MqttServerOptionsBuilder()
        .WithDefaultEndpointBoundIPAddress(ipAddress)
        .WithDefaultEndpointBoundIPV6Address(IPAddress.None)
        .Build()).GetAwaiter().GetResult();

    Accessing the MQTT server in an ASP.NET MVC controller

    If we have an ASP.NET Core application that needs to send MQTT messages from an MVC controller, the MqttService singleton needs to be registered with dependency injection. The trick is to have two methods to correctly setup the MQTT part:

    1. Have MqttService implement all the interfaces needed to hook with MqttServer (like IMqttServerClientConnectedHandlerIMqttServerApplicationMessageInterceptor, etc.)

    2. Write a ConfigureMqttServerOptions(AspNetMqttServerOptionsBuilder options) method that sets up the current object as callback for the needed methods:

    public void ConfigureMqttServerOptions(AspNetMqttServerOptionsBuilder options)
    {
        options.WithConnectionValidator(this);
         options.WithApplicationMessageInterceptor(this);
    }
    
    1. Write a ConfigureMqttServer(IMqttServer mqtt) that stores the reference to the MQTT server for later use and setup the handlers:
    public void ConfigureMqttServer(IMqttServer mqtt)
    {
        this.mqtt = mqtt;
        mqtt.ClientConnectedHandler = this;
        mqtt.ClientDisconnectedHandler = this;
    }
    

    Then, in your Startup class configure and use the service.

    In ConfigureServices:

    services.AddSingleton<MqttService>();
    services.AddHostedMqttServerWithServices(options => {
        var s = options.ServiceProvider.GetRequiredService<MqttService>();
        s.ConfigureMqttServerOptions(options);
    });
    services.AddMqttConnectionHandler();
    services.AddMqttWebSocketServerAdapter();
    

    In Configure:

    app.UseMqttEndpoint();
    app.UseMqttServer(server => app.ApplicationServices.GetRequiredService<MqttService().ConfigureMqttServer(server));
    
  • 相关阅读:
    Hadoop--Map/Reduce实现多表链接
    map/reduce实现 排序
    Hadoop-Map/Reduce实现实现倒排索引
    虚拟机之仅主机模式(HostOnly)链接外网设置
    hadoop家族之mahout安装
    SQLserver中的常量与变量、判断循环语句
    sqlserver中的数据转换与子查询
    SQLserver中常用的函数及实例
    sqlserver的增删改查
    SQLserver数据库基础
  • 原文地址:https://www.cnblogs.com/zhaoqm999/p/13020836.html
Copyright © 2011-2022 走看看