zoukankan      html  css  js  c++  java
  • ABP .net Core MQTT+signalr通讯

    abp版本: 4.3.0.0

    .net core 版本 2.2

    1、Mqtt 

    1.1 添加程序集:M2MqttDotnetCore(差点以为没有.net core 的)

    2.2 实现代码:抄了个单例模式,并将服务器断开和消息接收事件委托给外层

    public class MqttClientService
        {
            private IConfiguration _config;
            private static volatile MqttClientService _instance = null;
    
            private static readonly object LockHelper = new object();
    
            /// <summary>
            /// 创建单例模式
            /// </summary>
            public static MqttClientService CreateInstance(IConfiguration config)
            {
                if (_instance == null)
                {
                    lock (LockHelper)
                    {
                        if (_instance == null)
                            _instance = new MqttClientService(config);
                    }
                }
                return _instance;
            }
    
            /// <summary>
            /// 实例化订阅客户端
            /// </summary>
            public MqttClient SubscribeClient { get; set; }
    
    
            public Action<Object, MqttMsgPublishEventArgs> ReceivedMsg { get; set; }
            public Action<object, EventArgs> ClosedCon;
    
            public MqttClientService(IConfiguration config)
            {
                _config = config;
                //生成客户端ID并连接服务器  
                string ClientId = _config["MqttService:ClientId"];
                string HostIP = _config["MqttService:HostIP"];
                string Port= _config["MqttService:Port"];
                // create client instance 
                SubscribeClient = new MqttClient(IPAddress.Parse(HostIP), int.Parse(Port), false, new X509Certificate(), new X509Certificate(), MqttSslProtocols.None);
    
                // 消息接收处理事件
                SubscribeClient.MqttMsgPublishReceived += client_MqttMsgPublishReceived;
                //与服务器断开事件
                SubscribeClient.ConnectionClosed += Client_ConnectionClosed;
    
                SubscribeClient.Connect(ClientId);
    
                // 在这里初始化订阅,从数据库取出所有需要订阅的设备信息,进行订阅
                // SubscribeClient.Subscribe(new string[] { "avatar/uploaded" }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
            }
    
            void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
            {
                // handle message received 
                ReceivedMsg?.Invoke(sender, e);
            }
    
            /// <summary>
            /// 与服务器断开
            /// </summary>
            private void Client_ConnectionClosed(object sender, EventArgs e)
            {
                ClosedCon?.Invoke(sender, e);
            }
    
            /// <summary>
            /// 发布
            /// </summary>
            /// <param name="Topic">发布的主题</param>
            /// <param name="Data">发布的消息内容</param>
            public void Publish(string Topic, string Data)
            {
                SubscribeClient.Publish(Topic, Encoding.UTF8.GetBytes(Data), MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE, false);
            }
    
            /// <summary>
            /// 订阅
            /// </summary>
            /// <param name="Topic">订阅的主题</param>
            public void Subscribe(string[] Topic)
            {
                // 订阅主题"/home/temperature" 消息质量为 2(只有一次) 
                SubscribeClient.Subscribe(Topic, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
            }
    
            /// <summary>
            /// 取消订阅
            /// </summary>
            public void Unsubscribe(string[] Topic)
            {
                SubscribeClient.Unsubscribe(Topic);
            }
        }
    

      2.signalr,继承 AbpHubBase, ISingletonDependency(实现依赖注入的单例模式),这里只是为了做测试,所以代码有点丑

    public class MqttHub : AbpHubBase, ISingletonDependency//AbpHubBase
        {
            // 使用GetByUserIdOrNull、GetAllClients和IsOnline方法 获取在线用户信息
            private readonly IOnlineClientManager _onlineClient;
            private readonly IConfiguration _config;
            public MqttHub(IConfiguration config,IOnlineClientManager onlineClient):base()
            {
                _config = config;
                _onlineClient = onlineClient;
            }
    
            static List<MqttUserModel> userList = new List<MqttUserModel>();
            /// <summary>
            /// 订阅 这个方法名字是自定义的。参数也是自定义的
            /// </summary>
            public async Task SubscribeMessage(string Topic)
            {
                MqttClientService service = MqttClientService.CreateInstance(_config);
                service.ReceivedMsg += MqttHelper_ReceivedMsg;
                service.ClosedCon += MqttHelper_ClosedCon;
                await Task.Run(() => service.Subscribe(new string[] { Topic }));
            }
            /// <summary>
            /// 取消订阅
            /// </summary>
            /// <param name="Topic"></param>
            public void Unsubscribe(string Topic)
            {
                MqttClientService service = MqttClientService.CreateInstance(_config);
                service.ReceivedMsg += MqttHelper_ReceivedMsg;
                service.ClosedCon += MqttHelper_ClosedCon;
                service.Unsubscribe(new string[] { Topic });
            }
            /// <summary>
            /// 发布
            /// </summary>
            /// <param name="Topic"></param>
            /// <param name="Data"></param>
            public void PublishMessage(string Topic, string Data)
            {
                MqttClientService service = MqttClientService.CreateInstance(_config);
                service.ReceivedMsg += MqttHelper_ReceivedMsg;
                service.ClosedCon += MqttHelper_ClosedCon;
                service.Publish(Topic, Data);//发布
    
                var clientConnectionId = Context.ConnectionId; //这是与我连接的客户端的连接ID(浏览器端)
            }
            /// <summary>
            /// 添加在线人员
            /// </summary>
            public void AddOnlineUser()
            {
                //直接从当前登录者信息里面取
                var user = userList.FirstOrDefault(x => x.Id == AbpSession.GetUserId());
                if (user == null)
                {
                    //添加在线人员
                    userList.Add(new MqttUserModel
                    {
                        ConnectionId = Context.ConnectionId,
                        Id = AbpSession.GetUserId(),//随机用户id
                        UserName = AbpSession.GetUserName(),
                    });
                }
                else
                {
                    user.ConnectionId = Context.ConnectionId;
                }
                Clients.All.SendAsync("getMessage",new { msg = "当前登录用户:" + user.UserName + "
    " });
    
                var clientConnectionId = Context.ConnectionId; //这是与我连接的客户端的连接ID(浏览器端)
                Clients.Client(clientConnectionId).SendAsync("getMessage", new { msg = "您好,欢迎登陆!" });//指定接收者
            }
            /// <summary>
            /// 
            /// </summary>
            private void MqttHelper_ClosedCon(object sender, EventArgs e)
            {
                Clients.All.SendAsync("getMessage", new { msg = "服务器已断开链接
    " });
            }
            /// <summary>
            /// 接收到服务器端的返回
            /// </summary>>
            private void MqttHelper_ReceivedMsg(object sender, MqttMsgPublishEventArgs e)
            {
                byte[] b = e.Message;
                string str = System.Text.Encoding.UTF8.GetString(b);
    
                //All表示监听所有连接上来的客户端。
                //getMessage是一个动态的方法,名字我们可以随意定的。这里我仅仅是给他取名叫getMessage而已,我们也可以叫Clients.All.ABC();
                Clients.All.SendAsync("getMessage", new { msg = str + "
    " });//调用所有连接上来的客户端(包括自己)监听的getMessage事件。All是一个dynamic属性,所以可以随意的监听
            }
    
            public void SendMessage(string message)
            {
                Clients.All.SendAsync("getMessage", new { msg = string.Format("User {0}: {1}", AbpSession.UserId, message) });
            }
    
            /// <summary>
            /// 
            /// </summary>
            /// <returns></returns>
            public async override Task OnConnectedAsync()
            {
                await base.OnConnectedAsync();
                Logger.Debug("A client connected to MyChatHub: " + Context.ConnectionId);
            }
    
            /// <summary>
            /// 重写父类OnDisconnected方法 :OnConnected方法客户端断开连接的时候会调用此方法
            /// </summary>
            /// <param name="exception"></param>
            /// <returns></returns>
            public async override Task OnDisconnectedAsync(Exception exception)
            {
                await base.OnDisconnectedAsync(exception);
                Logger.Debug("A client disconnected from MyChatHub: " + Context.ConnectionId);
            }
    

      3、前端:

        <div class="layui-fluid">
            <div class="layui-col-sm3">
                <textarea id="TextArea1" rows="30" cols="900" style=" 100%;height:100%;max-inherit;"></textarea>
            </div>
            <div class="layui-col-sm6">
                <h5>订阅测试</h5>
                @*<p>服务器地址:</p>
                    <p><input id="txtIP" type="text" /></p>*@
                <p>订阅主题:</p>
                <p><input id="txtTopic" type="text" value="12/deviceStatus/12190101999" /></p>
                <p><input id="btnSubscribe" type="button" value="订阅" /><input id="btnSubscribeNo" type="button" value="取消订阅" /></p>
                <h5>发布测试</h5>
                <p>发布主题:</p>
                <p><input id="txtTopicPub" type="text" /></p>
                <p>发布内容</p>
                <p><textarea id="txtPublish" rows="10" cols="20" style=" 100%;height:100%;max-inherit;"></textarea></p>
                <p><input id="btnPublic" type="button" value="发布" /></p>
                <input type="button" id="btn1" value="提交" />
            </div>
        </div>
    

      4 JS

    var chatHub = null;
    
                abp.signalr.startConnection(abp.appPath + 'signalr-mqttHub', function (connection) {
                    chatHub = connection; // Save a reference to the hub
    
                    connection.on('getMessage', function (message) { // Register for incoming messages
                        $("#TextArea1").text($("#TextArea1").text() + message.msg + "");
                        //console.log('received message: ' + message);
                    });
                }).then(function (connection) {
                    $("#TextArea1").text($("#TextArea1").text() + "连接MyHub成功
    ");
                    //abp.log.debug('Connected to mqttHub server!');
                    abp.event.trigger('mqttHub.connected');
                });
    
                abp.event.on('mqttHub.connected', function () { // Register for connect event
                    chatHub.invoke('sendMessage', "Hi everybody, I'm connected to the chat!"); // Send a message to the server
                });
    
                $("#btnSubscribe").click(function () {
                    chatHub.invoke('subscribeMessage', $("#txtTopic").val());
                });
    
                $("#btnSubscribeNo").click(function () {
                    chatHub.invoke('unsubscribe', $("#txtTopic").val());
                });
    
                $("#btnPublic").click(function () {
                    chatHub.invoke('publishMessage', $("#txtPublish").val())
                });
    

      5.Startup Configure

     app.UseSignalR(routes =>
                {
                    routes.MapHub<AbpCommonHub>("/signalr");
                    routes.MapHub<MqttHub>("/signalr-mqttHub"); // Prefix with '/signalr'
                });
    

      原谅我写得太匆忙,15分钟居然没有写完,只能后面再补细节

  • 相关阅读:
    Python中把数据存入csv文件
    Python中把字典和值取出来
    scrapy输出请求状态码
    scrapy发送post请求获取cookie
    Python3 Unicode转中文
    Quartus13.0破解方法
    元音字母A的发音规则
    位bit——字节Byte???
    曾经的小孩在努力奔跑!
    如何看懂的时序图?
  • 原文地址:https://www.cnblogs.com/bamboo-zhang/p/11507815.html
Copyright © 2011-2022 走看看