zoukankan      html  css  js  c++  java
  • NetCore下搭建websocket集群方案

    介绍

    最近在做一个基于netcore的实时消息服务。最初选用的是ASP.NET Core SignalR,但是后来发现目前它并没有支持IOS的客户端,所以自己只好又基于websocket重新搭建了一套服务。

    因为前期已经使用了SignalR,所以我直接在原本的项目里面重新扩展了一套自定义websocket服务。

    在网上有一篇博文介绍了如何在Asp.net Core中使用中间件来管理websocket,我的大部分代码也是参考这篇文章。在这儿贴个链接

    在Asp.net Core中使用中间件来管理websocket

    自定义WebSocket 中间件

    要阅读ASP.NET Core中的WebSockets支持,可以在此处查看。如果你的项目跟我一样,已经使用了Signalr,那么你不需要在安装Microsoft.AspNetCore.WebSockets包,否则在项目开始前,

    需要安装此Nuget包。现在你可以自定义你自己的中间件了。

    /// <summary>
        /// websocket 协议扩展中间件
        /// </summary>
        public class CustomWebSocketMiddlewarr
        {
            private readonly RequestDelegate _next;
    
            public CustomWebSocketMiddlewarr(RequestDelegate next)
            {
                _next = next;
            }
    
            public async Task Invoke(HttpContext context, ICustomWebSocketFactory wsFactory, ICustomWebSocketMessageHandler wsmHandler)
            {
                 if (context.WebSockets.IsWebSocketRequest)
                    {
                        string ConId = context.Request.Query["sign"];
                        if (!string.IsNullOrEmpty(ConId))
                        {
                            WebSocket webSocket = await context.WebSockets.AcceptWebSocketAsync();
                            CustomWebSocket userWebSocket = new CustomWebSocket()
                            {
                                WebSocket = webSocket,
                                ConId = ConId
                            };
                            wsFactory.Add(userWebSocket);
                        //await wsmHandler.SendInitialMessages(userWebSocket);
                        await Listen(context, userWebSocket, wsFactory, wsmHandler);
                            
                        }
                    }
                    else
                    {
                        context.Response.StatusCode = 400;
                    }
                
                await _next(context);
            }
         //监听客户端发送过来的消息
            private async Task Listen(HttpContext context, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory, ICustomWebSocketMessageHandler wsmHandler)
            {
                WebSocket webSocket = userWebSocket.WebSocket;
                var buffer = new byte[1024 * 4];
                WebSocketReceiveResult result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
                while (!result.CloseStatus.HasValue)
                {
                    await wsmHandler.HandleMessage(result, buffer, userWebSocket, wsFactory);
                    buffer = new byte[1024 * 4];
                    result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
                }
                wsFactory.Remove(userWebSocket.ConId);
                await webSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None);
            }
        }

    在自定义的中间件中,首先判断是否是websocket请求,如果是的话,在查看是否有对应的sign标识,满足条件后进入后续的处理环节。

    简单讲解一下这里面的处理逻辑。因为我的项目中同时存在Signalr,而Signalr也会使用到websocket协议。但是Signalr的websocket请求传入的参数是id,所以我在这儿自定义了一个参数sign为了和Signalr

    做区分。那么这个sign是做什么用的呢? 其实sign是前端传过来的唯一标识,和此次连接对应,也可以理解为Signalr里面的connectionId。然后会把标识和对应websocket类到存入到一个list集合中。即代码

    中的  wsFactory.Add(userWebSocket)。

    CustomWebSocket是一个包含WebSocket和标识的类:

    public  class CustomWebSocket
        {
            
            public string ConId { get; set; }
    
            public WebSocket WebSocket { get; set; }
        }

    然后定义了一个Websocket工厂类,用来存取连接到服务的Websocket实例。

    //接口
    public
    interface ICustomWebSocketFactory { void Add(CustomWebSocket uws); void Remove(string conId); List<CustomWebSocket> All(); List<CustomWebSocket> Others(CustomWebSocket client); CustomWebSocket Client(string conId); }
      

    具体实现

    public class CustomWebSocketFactory: ICustomWebSocketFactory
        {
            List<CustomWebSocket> List;
            public CustomWebSocketFactory()
            {
                List = new List<CustomWebSocket>();
            }
            public void Add(CustomWebSocket uws)
            {
                List.Add(uws);
            }
            public void Remove(string conId)
            {
                List.Remove(Client(conId));
               
            }
            public List<CustomWebSocket> All()
            {
                return List;
            }
           
            public List<CustomWebSocket> Others(CustomWebSocket client)
            {
                return List.Where(c => c.ConId != client.ConId).ToList();
            }
            public CustomWebSocket Client(string conId)
            {
                var uws= List.FirstOrDefault(c => c.ConId == conId);
                return uws;
    
            }
        }

    可以看到最终我们存取websocket都是通过list来进行,所以在注入的时候一定要注意。注入成单例模式。

    services.AddSingleton<ICustomWebSocketFactory, CustomWebSocketFactory>();

    CustomWebSocketMessageHandle包含有关消息处理的逻辑(发送,接收)
    public interface ICustomWebSocketMessageHandler
        {
            Task SendInitialMessages(CustomWebSocket userWebSocket);
            Task HandleMessage(WebSocketReceiveResult result, byte[] buffer, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory);
            Task SendMessageInfo(string conId, object data, ICustomWebSocketFactory wsFactory);
    
    
        }
    
    public   class CustomWebSocketMessageHandler:ICustomWebSocketMessageHandler
        {
            public async Task SendInitialMessages(CustomWebSocket userWebSocket)
            {
                WebSocket webSocket = userWebSocket.WebSocket;
                var msg = new CustomWebSocketMessage
                {
                    MessagDateTime = DateTime.Now,
                    Type = WSMessageType.连接响应
                };
    
                string serialisedMessage = JsonConvert.SerializeObject(msg);
                byte[] bytes = Encoding.ASCII.GetBytes(serialisedMessage);
                await webSocket.SendAsync(new ArraySegment<byte>(bytes, 0, bytes.Length), WebSocketMessageType.Text, true, CancellationToken.None);
            }
            /// <summary>
            /// 推送消息到客户端
            /// </summary>
            /// <returns></returns>
            public async Task SendMessageInfo(string conId,object data, ICustomWebSocketFactory wsFactory)
            {
                var uws = wsFactory.Client(conId);
                CustomWebSocketMessage message = new CustomWebSocketMessage();
                message.DataInfo = data;
                message.Type = WSMessageType.任务数量;
                message.MessagDateTime = DateTime.Now;
                if (uws == null)
                {
                    //广播到其他集群节点
                    var listpush = new List<PushMsg>();
    
                    var push = new PushMsg()
                    {
                        sendjsonMsg = new WebSocketFanoutDto()
                        {
                            conId = conId,
                            data = message
                        },
                        exchangeName = "saas.reltimewsmes.exchange",
                        sendEnum = SendEnum.订阅模式
                    };
                    listpush.Add(push);
                    BTRabbitMQManage.PushMessageAsync(listpush);
                    return;
                }
               
                var mesbuffer = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
                var mescount = Encoding.UTF8.GetByteCount(JsonConvert.SerializeObject(message));
               await uws.WebSocket.SendAsync(new ArraySegment<byte>(mesbuffer, 0, mescount), WebSocketMessageType.Text, true, CancellationToken.None);
            }
    
            /// <summary>
            /// 处理接收到的客户端信息
            /// </summary>
            /// <param name="result"></param>
            /// <param name="buffer"></param>
            /// <param name="userWebSocket"></param>
            /// <param name="wsFactory"></param>
            /// <returns></returns>
            public async Task HandleMessage(WebSocketReceiveResult result, byte[] buffer, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory)
            {
                string msg = Encoding.UTF8.GetString(buffer);
                try
                {
                    var message = JsonConvert.DeserializeObject<CustomWebSocketMessage>(msg);
                    if (message.Type == WSMessageType.用户信息)
                    {
                        var logdto = JsonConvert.DeserializeObject<LoginInfoDto>(message.DataInfo.ToJsonString());
                        await InitUserInfo(logdto, userWebSocket, wsFactory);
                    }
                   
                }
                catch (Exception e)
                {
                    var exbuffer = Encoding.UTF8.GetBytes(e.Message);
                    var excount = Encoding.UTF8.GetByteCount(e.Message);
                    await userWebSocket.WebSocket.SendAsync(new ArraySegment<byte>(exbuffer, 0, excount), result.MessageType, result.EndOfMessage, CancellationToken.None);
                }
            }
            /// <summary>
            /// 初始化用户连接关系
            /// </summary>
            /// <param name="dto"></param>
            /// <param name="userWebSocket"></param>
            /// <param name="wsFactory"></param>
            /// <returns></returns>
            private async Task InitUserInfo(LoginInfoDto dto, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory)
            {
                if (dto.userId == 0)
                    return;
                var contectid = userWebSocket.ConId;
                var key = "";
                if (dto.tenantId.HasValue)
                    key += "T_" + dto.userId + "_" + dto.tenantId + "_" + "tenant_";
                if (dto.bankId.HasValue)
                    key += "B_" + dto.userId + "_" + dto.bankId + "_" + "bank_";
                key += dto.fromeType;
                //添加缓存
                CacheInstace<string>.GetRedisInstanceDefaultMemery().AddOrUpdate(key, contectid, r =>
                {
                    r = contectid;
                    return r;
                });
                CacheInstace<string>.GetRedisInstanceDefaultMemery().Expire(key, new TimeSpan(12, 0, 0));
               
            }
           
        }
    在这里面,推送消息到客户端的时候,如果未找到标识对应的Websocket对象,则将消息广播到所有的集群节点上。我们知道Signalr里面的集群实现通过redis来做的,但在此处,因为
    我项目里面已经搭建了Rabbitmq的高可用集群,所以我直接通过Rabbitmq来进行广播。这样不管我是在集群的那个节点上来推送消息,都可以保证消息被正确推送到客户端。
    关于广播消息的订阅实现:
     public class WebSocketFanoutDto
        {
            public string conId { get; set; }
    
            public CustomWebSocketMessage data { get; set; }
        }
    
     public class FanoutMesConsume : IMessageConsume
        {
            public void Consume(string message)
            {
                var condto = JsonConvert.DeserializeObject<WebSocketFanoutDto>(message);
                var wsFactory = IOCManage.ServiceProvider.GetService<ICustomWebSocketFactory>();
                var uws = wsFactory.Client(condto.conId);
                if (uws != null)
                {
                    //发送消息
                    var mesbuffer = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(condto.data));
                    var mescount = Encoding.UTF8.GetByteCount(JsonConvert.SerializeObject(condto.data));
                    uws.WebSocket.SendAsync(new ArraySegment<byte>(mesbuffer, 0, mescount), WebSocketMessageType.Text, true, CancellationToken.None);
                }
            }
        }
    
    

    最后在扩展类里面添加消息监视和注入Websocket中间件。

    当然不要忘记 消息处理类的依赖注入

    services.AddSingleton<ICustomWebSocketMessageHandler, CustomWebSocketMessageHandler>();
    
    
     public static IApplicationBuilder UseCustomWebSocketManager(this IApplicationBuilder app)
            {
                //添加针对分布式集群的消息监视
                RabbitMQManage.Subscribe<FanoutMesConsume>(new MesArgs()
                {
                    exchangeName = "reltimewsmes.exchange",
                    sendEnum = SendEnum.订阅模式
                });
                return app.UseMiddleware<CustomWebSocketMiddlewarr>();
            }

    至此这个框架搭建完成,最后在startup类中注入。

    关于Rabbitmq的使用,发送和接收是我基于easynetq封装的一个帮助类,大家可以自行实现。

    这里面最主要的逻辑就是每一个websocket实例都有一个对应的标识,然后在连接成功后,前端会发送用户信息,后端服务再把用户信息和连接标识关联。这样如果想推送信息到某个用户的话,就可以通过

    用户信息来找到用户对应的连接信息。至于为什么整个流程会这么复杂的,就一言难尽(我能怎么办,我也很绝望啊)。大多数时候大家都可以直接通过token认证来绑定用户和socket连接。

    目前还有几个问题一个广播消息的时候,发送消息方也会收到这个消息,这挺尴尬,目前我还没想到太好的解决办法。

    第二个是采用单例list字段存储连接的websocket实例,少的时候还好,如果多的话,感觉可能会存在堆栈溢出的问题,但没实际测试过,所以目前还不知道最大的连接数多少。

  • 相关阅读:
    Django 三(路由)
    Django 二
    Django基础知识
    jquery值,属性,类的操作 文档的操作
    jquery事件对象 , 链式编程,选择器和过滤器,动画;js和jquery互转
    MySQL-1-简介-安装流程
    前端 里的面向对象
    篇二:JS身份证校验
    HTML5属性--(capture="camera") 上传照片或者打开手机相机
    SpringMVC @RequestBody接收Json对象字符串
  • 原文地址:https://www.cnblogs.com/dandan123/p/10059026.html
Copyright © 2011-2022 走看看