abp版本: 4.3.0.0
.net core 版本 2.2
1、Mqtt
1.1 添加程序集:M2MqttDotnetCore(差点以为没有.net core 的)
2.2 实现代码:抄了个单例模式,并将服务器断开和消息接收事件委托给外层
public class MqttClientService { private IConfiguration _config; private static volatile MqttClientService _instance = null; private static readonly object LockHelper = new object(); /// <summary> /// 创建单例模式 /// </summary> public static MqttClientService CreateInstance(IConfiguration config) { if (_instance == null) { lock (LockHelper) { if (_instance == null) _instance = new MqttClientService(config); } } return _instance; } /// <summary> /// 实例化订阅客户端 /// </summary> public MqttClient SubscribeClient { get; set; } public Action<Object, MqttMsgPublishEventArgs> ReceivedMsg { get; set; } public Action<object, EventArgs> ClosedCon; public MqttClientService(IConfiguration config) { _config = config; //生成客户端ID并连接服务器 string ClientId = _config["MqttService:ClientId"]; string HostIP = _config["MqttService:HostIP"]; string Port= _config["MqttService:Port"]; // create client instance SubscribeClient = new MqttClient(IPAddress.Parse(HostIP), int.Parse(Port), false, new X509Certificate(), new X509Certificate(), MqttSslProtocols.None); // 消息接收处理事件 SubscribeClient.MqttMsgPublishReceived += client_MqttMsgPublishReceived; //与服务器断开事件 SubscribeClient.ConnectionClosed += Client_ConnectionClosed; SubscribeClient.Connect(ClientId); // 在这里初始化订阅,从数据库取出所有需要订阅的设备信息,进行订阅 // SubscribeClient.Subscribe(new string[] { "avatar/uploaded" }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE }); } void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e) { // handle message received ReceivedMsg?.Invoke(sender, e); } /// <summary> /// 与服务器断开 /// </summary> private void Client_ConnectionClosed(object sender, EventArgs e) { ClosedCon?.Invoke(sender, e); } /// <summary> /// 发布 /// </summary> /// <param name="Topic">发布的主题</param> /// <param name="Data">发布的消息内容</param> public void Publish(string Topic, string Data) { SubscribeClient.Publish(Topic, Encoding.UTF8.GetBytes(Data), MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE, false); } /// <summary> /// 订阅 /// </summary> /// <param name="Topic">订阅的主题</param> public void Subscribe(string[] Topic) { // 订阅主题"/home/temperature" 消息质量为 2(只有一次) SubscribeClient.Subscribe(Topic, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE }); } /// <summary> /// 取消订阅 /// </summary> public void Unsubscribe(string[] Topic) { SubscribeClient.Unsubscribe(Topic); } }
2.signalr,继承 AbpHubBase, ISingletonDependency(实现依赖注入的单例模式),这里只是为了做测试,所以代码有点丑
public class MqttHub : AbpHubBase, ISingletonDependency//AbpHubBase { // 使用GetByUserIdOrNull、GetAllClients和IsOnline方法 获取在线用户信息 private readonly IOnlineClientManager _onlineClient; private readonly IConfiguration _config; public MqttHub(IConfiguration config,IOnlineClientManager onlineClient):base() { _config = config; _onlineClient = onlineClient; } static List<MqttUserModel> userList = new List<MqttUserModel>(); /// <summary> /// 订阅 这个方法名字是自定义的。参数也是自定义的 /// </summary> public async Task SubscribeMessage(string Topic) { MqttClientService service = MqttClientService.CreateInstance(_config); service.ReceivedMsg += MqttHelper_ReceivedMsg; service.ClosedCon += MqttHelper_ClosedCon; await Task.Run(() => service.Subscribe(new string[] { Topic })); } /// <summary> /// 取消订阅 /// </summary> /// <param name="Topic"></param> public void Unsubscribe(string Topic) { MqttClientService service = MqttClientService.CreateInstance(_config); service.ReceivedMsg += MqttHelper_ReceivedMsg; service.ClosedCon += MqttHelper_ClosedCon; service.Unsubscribe(new string[] { Topic }); } /// <summary> /// 发布 /// </summary> /// <param name="Topic"></param> /// <param name="Data"></param> public void PublishMessage(string Topic, string Data) { MqttClientService service = MqttClientService.CreateInstance(_config); service.ReceivedMsg += MqttHelper_ReceivedMsg; service.ClosedCon += MqttHelper_ClosedCon; service.Publish(Topic, Data);//发布 var clientConnectionId = Context.ConnectionId; //这是与我连接的客户端的连接ID(浏览器端) } /// <summary> /// 添加在线人员 /// </summary> public void AddOnlineUser() { //直接从当前登录者信息里面取 var user = userList.FirstOrDefault(x => x.Id == AbpSession.GetUserId()); if (user == null) { //添加在线人员 userList.Add(new MqttUserModel { ConnectionId = Context.ConnectionId, Id = AbpSession.GetUserId(),//随机用户id UserName = AbpSession.GetUserName(), }); } else { user.ConnectionId = Context.ConnectionId; } Clients.All.SendAsync("getMessage",new { msg = "当前登录用户:" + user.UserName + " " }); var clientConnectionId = Context.ConnectionId; //这是与我连接的客户端的连接ID(浏览器端) Clients.Client(clientConnectionId).SendAsync("getMessage", new { msg = "您好,欢迎登陆!" });//指定接收者 } /// <summary> /// /// </summary> private void MqttHelper_ClosedCon(object sender, EventArgs e) { Clients.All.SendAsync("getMessage", new { msg = "服务器已断开链接 " }); } /// <summary> /// 接收到服务器端的返回 /// </summary>> private void MqttHelper_ReceivedMsg(object sender, MqttMsgPublishEventArgs e) { byte[] b = e.Message; string str = System.Text.Encoding.UTF8.GetString(b); //All表示监听所有连接上来的客户端。 //getMessage是一个动态的方法,名字我们可以随意定的。这里我仅仅是给他取名叫getMessage而已,我们也可以叫Clients.All.ABC(); Clients.All.SendAsync("getMessage", new { msg = str + " " });//调用所有连接上来的客户端(包括自己)监听的getMessage事件。All是一个dynamic属性,所以可以随意的监听 } public void SendMessage(string message) { Clients.All.SendAsync("getMessage", new { msg = string.Format("User {0}: {1}", AbpSession.UserId, message) }); } /// <summary> /// /// </summary> /// <returns></returns> public async override Task OnConnectedAsync() { await base.OnConnectedAsync(); Logger.Debug("A client connected to MyChatHub: " + Context.ConnectionId); } /// <summary> /// 重写父类OnDisconnected方法 :OnConnected方法客户端断开连接的时候会调用此方法 /// </summary> /// <param name="exception"></param> /// <returns></returns> public async override Task OnDisconnectedAsync(Exception exception) { await base.OnDisconnectedAsync(exception); Logger.Debug("A client disconnected from MyChatHub: " + Context.ConnectionId); }
3、前端:
<div class="layui-fluid"> <div class="layui-col-sm3"> <textarea id="TextArea1" rows="30" cols="900" style=" 100%;height:100%;max-inherit;"></textarea> </div> <div class="layui-col-sm6"> <h5>订阅测试</h5> @*<p>服务器地址:</p> <p><input id="txtIP" type="text" /></p>*@ <p>订阅主题:</p> <p><input id="txtTopic" type="text" value="12/deviceStatus/12190101999" /></p> <p><input id="btnSubscribe" type="button" value="订阅" /><input id="btnSubscribeNo" type="button" value="取消订阅" /></p> <h5>发布测试</h5> <p>发布主题:</p> <p><input id="txtTopicPub" type="text" /></p> <p>发布内容</p> <p><textarea id="txtPublish" rows="10" cols="20" style=" 100%;height:100%;max-inherit;"></textarea></p> <p><input id="btnPublic" type="button" value="发布" /></p> <input type="button" id="btn1" value="提交" /> </div> </div>
4 JS
var chatHub = null; abp.signalr.startConnection(abp.appPath + 'signalr-mqttHub', function (connection) { chatHub = connection; // Save a reference to the hub connection.on('getMessage', function (message) { // Register for incoming messages $("#TextArea1").text($("#TextArea1").text() + message.msg + ""); //console.log('received message: ' + message); }); }).then(function (connection) { $("#TextArea1").text($("#TextArea1").text() + "连接MyHub成功 "); //abp.log.debug('Connected to mqttHub server!'); abp.event.trigger('mqttHub.connected'); }); abp.event.on('mqttHub.connected', function () { // Register for connect event chatHub.invoke('sendMessage', "Hi everybody, I'm connected to the chat!"); // Send a message to the server }); $("#btnSubscribe").click(function () { chatHub.invoke('subscribeMessage', $("#txtTopic").val()); }); $("#btnSubscribeNo").click(function () { chatHub.invoke('unsubscribe', $("#txtTopic").val()); }); $("#btnPublic").click(function () { chatHub.invoke('publishMessage', $("#txtPublish").val()) });
5.Startup Configure
app.UseSignalR(routes => { routes.MapHub<AbpCommonHub>("/signalr"); routes.MapHub<MqttHub>("/signalr-mqttHub"); // Prefix with '/signalr' });
原谅我写得太匆忙,15分钟居然没有写完,只能后面再补细节