environment
.net core 3.1
前言
项目要求弄一个即时通讯
由于.net 已经集成了websocket通讯中间件-signalR,并且运作的效率还可以,为减少开发周期便使用了signalR作为websocket连接管理中间件。
既然是通讯,那就要解决最基本的连接问题。
如何连?以什么作为凭证?
既然是用户与用户之间的通信,那边应该用用户标识作为凭证进行连接,无标识的连接(游客)将毫无意义。
然后一般情况下(参考qq/微信),通讯时是不允许有一个用户多个通讯连接的。既然如此,那便要考虑一个用户二次连接的问题:
在这里,我们选择了第二次登录,将之前登录的用户强制退出。
退出的方式有两种:
- 客户端自己断开
- 服务端强制客户断开
随意一点的(自己玩的那种)就客户端自己断开就足够了,但如果是正式的项目的话还是要强制断开,保证操作的完整性。
好,既然要强制断开,那便是服务端移除连接.
先说结果:
一、在会话中断开
会话中是指服务端已获取到连接
使用场景:指客户发送一个[关闭指令],然后服务端自动关闭连接
在Microsoft.AspNetCore.SignalR.Hub
中有一个public HubCallerContext Context { get; set; }
表示调用方的上下文。
然后在Microsoft.AspNetCore.SignalR.HubCallerContext
中提供了一个方法:
//
// Summary:
// Aborts the connection.
public abstract void Abort();// --> 使中止,中断连接
故只需要在对应的方法块中使用Context.Abort();
便可断开连接
二、在会话外断开
会话外是指服务端还未获取到连接
使用场景:用户在小米登录了账号然后又在华为登录了账号,此时小米的账号应该被强制下线。
根据场景触发事件是华为登录了账号,此时你不清楚小米的连接
于是我们要使用一个集合保留设备-》连接的映射关系:
ConcurrentDictionary<string, HubCallerContext> _connections // 此处key为连接凭证,value为此连接对应的上下文
注:HubCallerContext
是一个单例对象,即一个连接中的所有HubCallerContext
都是一样的,故此方法可行
然后在连接开启时即Hub.OnConnectedAsync
时维护此集合,若是存在一个用户对应多个连接,你还需要维护一个用户->连接凭证的集合
然后在华为连接触发OnConnectedAsync
时,检查此集合是否已存在此凭证,若存在则取出对应上下文-HubCallerContext
调用Abort
进行强制退出
三、源码分析
ps:若是你只是想知道服务端怎么强制断开连接的话,下面就不用看了
由于百度、Google都没搜到需要的结果,只好自己来了...
强制断开即是服务端移除连接
首先,想要释放便得知道连接保存在哪
自己写过websocket的应该都知道,当连接建立后,服务端需要将连接进行保存避免自动释放,那么signalR既然是封装了websocket,那么必然也存在类似的操作
贴一下signalR service注册部分: Microsoft.Extensions.DependencyInjection.SignalRDependencyInjectionExtensions
services.TryAddSingleton<SignalRMarkerService>();
services.TryAddSingleton<SignalRCoreMarkerService>();
services.TryAddSingleton(typeof(HubLifetimeManager<>), typeof(DefaultHubLifetimeManager<>));
services.TryAddSingleton(typeof(IHubProtocolResolver), typeof(DefaultHubProtocolResolver));
services.TryAddSingleton(typeof(IHubContext<>), typeof(HubContext<>));
services.TryAddSingleton(typeof(IHubContext<, >), typeof(HubContext<, >));
services.TryAddSingleton(typeof(HubConnectionHandler<>), typeof(HubConnectionHandler<>));
services.TryAddSingleton(typeof(IUserIdProvider), typeof(DefaultUserIdProvider));
services.TryAddSingleton(typeof(HubDispatcher<>), typeof(DefaultHubDispatcher<>));
services.TryAddScoped(typeof(IHubActivator<>), typeof(DefaultHubActivator<>));
services.AddAuthorization();
SignalRServerBuilder signalRServerBuilder = new SignalRServerBuilder(services);
signalRServerBuilder.AddJsonProtocol();
先看app使用hub的地方:
app.UseEndpoints(endpoints =>
{
endpoints.MapHub<XxxHub>("/xxxHub");
});
navigation->Microsoft.AspNetCore.Builder.HubEndpointRouteBuilderExtensions.HubEndpointRouteBuilderExtensions
public static class HubEndpointRouteBuilderExtensions { public static HubEndpointConventionBuilder MapHub<THub>(this IEndpointRouteBuilder endpoints, string pattern) where THub : Hub { return endpoints.MapHub<THub>(pattern, null); }
public static HubEndpointConventionBuilder MapHub<THub>(this IEndpointRouteBuilder endpoints, string pattern, Action<HttpConnectionDispatcherOptions> configureOptions) where THub : Hub { if (endpoints.ServiceProvider.GetService<SignalRMarkerService>() == null) { throw new InvalidOperationException("Unable to find the required services. Please add all the required services by calling 'IServiceCollection.AddSignalR' inside the call to 'ConfigureServices(...)' in the application startup code."); } HttpConnectionDispatcherOptions httpConnectionDispatcherOptions = new HttpConnectionDispatcherOptions(); configureOptions?.Invoke(httpConnectionDispatcherOptions); ConnectionEndpointRouteBuilder connectionEndpointRouteBuilder = endpoints.MapConnections(pattern, httpConnectionDispatcherOptions, delegate(IConnectionBuilder b) { b.UseHub<THub>(); }); object[] attributes = typeof(THub).GetCustomAttributes(inherit: true); connectionEndpointRouteBuilder.Add(delegate(EndpointBuilder e) { object[] array = attributes; foreach (object item in array) { e.Metadata.Add(item); } e.Metadata.Add(new HubMetadata(typeof(THub))); }); return new HubEndpointConventionBuilder(connectionEndpointRouteBuilder); }
}
key code : b.UseHub<THub>();
navigation -> Microsoft.AspNetCore.SignalR.SignalRConnectionBuilderExtensions.SignalRConnectionBuilderExtensions
public static class SignalRConnectionBuilderExtensions
{
public static IConnectionBuilder UseHub<THub>(this IConnectionBuilder connectionBuilder) where THub : Hub
{
if (connectionBuilder.ApplicationServices.GetService(typeof(SignalRCoreMarkerService)) == null)
{
throw new InvalidOperationException("Unable to find the required services. Please add all the required services by calling 'IServiceCollection.AddSignalR' inside the call to 'ConfigureServices(...)' in the application startup code.");
}
return connectionBuilder.UseConnectionHandler<HubConnectionHandler<THub>>();
}
}
navigation -> Microsoft.AspNetCore.Connections.ConnectionBuilderExtensions.ConnectionBuilderExtensions
public static IConnectionBuilder UseConnectionHandler<TConnectionHandler>(this IConnectionBuilder connectionBuilder) where TConnectionHandler : ConnectionHandler
{
TConnectionHandler handler = ActivatorUtilities.GetServiceOrCreateInstance<TConnectionHandler>(connectionBuilder.ApplicationServices);
return connectionBuilder.Run((ConnectionContext connection) => handler.OnConnectedAsync(connection));
}
OnConnectedAsync!!!,见名思以当连接打开时触发,这个应该就是关键点了
navigation -> Microsoft.AspNetCore.Connections.ConnectionHandler
public override async Task OnConnectedAsync(ConnectionContext connection) { IList<string> list = _hubOptions.SupportedProtocols ?? _globalHubOptions.SupportedProtocols; if (list == null || list.Count == 0)// 未配置连接协议 { throw new InvalidOperationException("There are no supported protocols"); } // 超时时间 TimeSpan timeout = _hubOptions.HandshakeTimeout ?? _globalHubOptions.HandshakeTimeout ?? HubOptionsSetup.DefaultHandshakeTimeout;
// 连接上下文配置 HubConnectionContextOptions contextOptions = new HubConnectionContextOptions { KeepAliveInterval = (_hubOptions.KeepAliveInterval ?? _globalHubOptions.KeepAliveInterval ?? HubOptionsSetup.DefaultKeepAliveInterval), ClientTimeoutInterval = (_hubOptions.ClientTimeoutInterval ?? _globalHubOptions.ClientTimeoutInterval ?? HubOptionsSetup.DefaultClientTimeoutInterval), StreamBufferCapacity = (_hubOptions.StreamBufferCapacity ?? _globalHubOptions.StreamBufferCapacity ?? 10), MaximumReceiveMessageSize = _maximumMessageSize }; Log.ConnectedStarting(_logger); // **** 构建连接对象 HubConnectionContext connectionContext = new HubConnectionContext(connection, contextOptions, _loggerFactory); IReadOnlyList<string> supportedProtocols = (list as IReadOnlyList<string>) ?? list.ToList(); // 然后进行握手连接 if (await connectionContext.HandshakeAsync(timeout, supportedProtocols, _protocolResolver, _userIdProvider, _enableDetailedErrors)) { // 握手成功 try { await _lifetimeManager.OnConnectedAsync(connectionContext); await RunHubAsync(connectionContext); } finally { Log.ConnectedEnding(_logger); await _lifetimeManager.OnDisconnectedAsync(connectionContext); } }
}
主要看握手成功之后的内容:
try
{
await _lifetimeManager.OnConnectedAsync(connectionContext);
await RunHubAsync(connectionContext);
}
finally
{
Log.ConnectedEnding(_logger);
await _lifetimeManager.OnDisconnectedAsync(connectionContext);
}
首先可以看到在finally
中调用了OnDisconnectedAsync
,见名思以我觉得它应该就是我们要找的释放连接,查看定义:
private readonly HubLifetimeManager<THub> _lifetimeManager;
而且通过之前的注册来看此成员是一个单例,感觉非常符合,继续查看定义: Microsoft.AspNetCore.SignalR.HubLifetimeManager
public abstract class HubLifetimeManager<THub> where THub : Hub { public abstract Task OnConnectedAsync(HubConnectionContext connection);
public abstract Task OnDisconnectedAsync(HubConnectionContext connection); public abstract Task SendAllAsync(string methodName, object[] args, CancellationToken cancellationToken = default(CancellationToken)); public abstract Task SendAllExceptAsync(string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds, CancellationToken cancellationToken = default(CancellationToken)); public abstract Task SendConnectionAsync(string connectionId, string methodName, object[] args, CancellationToken cancellationToken = default(CancellationToken)); public abstract Task SendConnectionsAsync(IReadOnlyList<string> connectionIds, string methodName, object[] args, CancellationToken cancellationToken = default(CancellationToken)); public abstract Task SendGroupAsync(string groupName, string methodName, object[] args, CancellationToken cancellationToken = default(CancellationToken)); public abstract Task SendGroupsAsync(IReadOnlyList<string> groupNames, string methodName, object[] args, CancellationToken cancellationToken = default(CancellationToken)); public abstract Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds, CancellationToken cancellationToken = default(CancellationToken)); public abstract Task SendUserAsync(string userId, string methodName, object[] args, CancellationToken cancellationToken = default(CancellationToken)); public abstract Task SendUsersAsync(IReadOnlyList<string> userIds, string methodName, object[] args, CancellationToken cancellationToken = default(CancellationToken)); public abstract Task AddToGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default(CancellationToken)); public abstract Task RemoveFromGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default(CancellationToken));
}
到此有点小懵逼了???,这里的方法都是返回Task
,但我释放连接需要HubConnectionContext
,岂不是无解???
虽然很像但是不是就很郁闷,既然_lifetimeManager
做不了,就只能去看:
await RunHubAsync(connectionContext);
private async Task RunHubAsync(HubConnectionContext connection)
{
try
{
await _dispatcher.OnConnectedAsync(connection);
}
catch (Exception exception)
{
Log.ErrorDispatchingHubEvent(_logger, "OnConnectedAsync", exception);
await SendCloseAsync(connection, exception, allowReconnect: false);
return;
}
try
{
await DispatchMessagesAsync(connection);
}
catch (OperationCanceledException)
{
}
catch (Exception exception2)
{
Log.ErrorProcessingRequest(_logger, exception2);
await HubOnDisconnectedAsync(connection, exception2);
return;
}
await HubOnDisconnectedAsync(connection, null);
}
一个一个来,先看await _dispatcher.OnConnectedAsync(connection);
:
navigation -> Microsoft.AspNetCore.SignalR.Internal.DefaultHubDispatcher
public override async Task OnConnectedAsync(HubConnectionContext connection) { IServiceScope scope = null; try { // 通过 service 拿到了THub scope = _serviceScopeFactory.CreateScope(); IHubActivator<THub> hubActivator = scope.ServiceProvider.GetRequiredService<IHubActivator<THub>>(); THub hub = hubActivator.Create(); try { // 然后通过hub 和 连接进行初始化 InitializeHub(hub, connection); await hub.OnConnectedAsync(); } finally { hubActivator.Release(hub); } } finally { await scope.DisposeAsync(); } }
private void InitializeHub(THub hub, HubConnectionContext connection)
{
hub.Clients = new HubCallerClients(_hubContext.Clients, connection.ConnectionId); // 只用到了ConnectionId显然不是
hub.Context = connection.HubCallerContext; // 这个就有点可疑了
hub.Groups = _hubContext.Groups;// 只用到了分组应该也不是
}
查看HubConnectionContext
的构造方法看看HubCallerContext
是如何被构造的:
public HubConnectionContext(ConnectionContext connectionContext, HubConnectionContextOptions contextOptions, ILoggerFactory loggerFactory)
{
...
HubCallerContext = new DefaultHubCallerContext(this);
...
}
navigation -> Microsoft.AspNetCore.SignalR.Internal.DefaultHubCallerContext
internal class DefaultHubCallerContext : HubCallerContext { private readonly HubConnectionContext _connection;
public override string ConnectionId => _connection.ConnectionId; public override string UserIdentifier => _connection.UserIdentifier; public override ClaimsPrincipal User => _connection.User; public override IDictionary<object, object> Items => _connection.Items; public override IFeatureCollection Features => _connection.Features; public override CancellationToken ConnectionAborted => _connection.ConnectionAborted; public DefaultHubCallerContext(HubConnectionContext connection) { _connection = connection; } public override void Abort() { // ************************ _connection.Abort(); }
}
Abort -> 使中止 推测是中止连接,而且通过源码可知调的是HubConnectionContext.Abort
.
Hub中的定义:
public HubCallerContext Context
{
get
{
CheckDisposed();
return _context;
}
set
{
CheckDisposed();
_context = value;
}
}
通过测试结果可知,这个便是服务器中断连接的方法了
[Over~]