zoukankan      html  css  js  c++  java
  • .net core signalR 服务端断开连接

    .net core signalR 服务端断开连接

    environment

    .net core 3.1

    前言

    项目要求弄一个即时通讯

    由于.net 已经集成了websocket通讯中间件-signalR,并且运作的效率还可以,为减少开发周期便使用了signalR作为websocket连接管理中间件。

    既然是通讯,那就要解决最基本的连接问题。

    如何连?以什么作为凭证?

    既然是用户与用户之间的通信,那边应该用用户标识作为凭证进行连接,无标识的连接(游客)将毫无意义。

    然后一般情况下(参考qq/微信),通讯时是不允许有一个用户多个通讯连接的。既然如此,那便要考虑一个用户二次连接的问题:

    在这里,我们选择了第二次登录,将之前登录的用户强制退出。

    退出的方式有两种:

    • 客户端自己断开
    • 服务端强制客户断开

    随意一点的(自己玩的那种)就客户端自己断开就足够了,但如果是正式的项目的话还是要强制断开,保证操作的完整性。

    好,既然要强制断开,那便是服务端移除连接.

    先说结果:

    一、在会话中断开

    会话中是指服务端已获取到连接

    使用场景:指客户发送一个[关闭指令],然后服务端自动关闭连接

    Microsoft.AspNetCore.SignalR.Hub中有一个public HubCallerContext Context { get; set; }表示调用方的上下文。

    然后在Microsoft.AspNetCore.SignalR.HubCallerContext中提供了一个方法:

    //
    // Summary:
    //     Aborts the connection.
    public abstract void Abort();// --> 使中止,中断连接
    

    故只需要在对应的方法块中使用Context.Abort();便可断开连接

    二、在会话外断开

    会话外是指服务端还未获取到连接

    使用场景:用户在小米登录了账号然后又在华为登录了账号,此时小米的账号应该被强制下线。

    根据场景触发事件是华为登录了账号,此时你不清楚小米的连接

    于是我们要使用一个集合保留设备-》连接的映射关系:

    ConcurrentDictionary<string, HubCallerContext> _connections // 此处key为连接凭证,value为此连接对应的上下文
    

    注:HubCallerContext是一个单例对象,即一个连接中的所有HubCallerContext都是一样的,故此方法可行

    然后在连接开启时即Hub.OnConnectedAsync时维护此集合,若是存在一个用户对应多个连接,你还需要维护一个用户->连接凭证的集合

    然后在华为连接触发OnConnectedAsync时,检查此集合是否已存在此凭证,若存在则取出对应上下文-HubCallerContext调用Abort进行强制退出

    三、源码分析

    ps:若是你只是想知道服务端怎么强制断开连接的话,下面就不用看了

    由于百度、Google都没搜到需要的结果,只好自己来了...

    强制断开即是服务端移除连接

    首先,想要释放便得知道连接保存在哪

    自己写过websocket的应该都知道,当连接建立后,服务端需要将连接进行保存避免自动释放,那么signalR既然是封装了websocket,那么必然也存在类似的操作

    贴一下signalR service注册部分: Microsoft.Extensions.DependencyInjection.SignalRDependencyInjectionExtensions

    services.TryAddSingleton<SignalRMarkerService>();
    

    services.TryAddSingleton<SignalRCoreMarkerService>();
    services.TryAddSingleton(typeof(HubLifetimeManager<>), typeof(DefaultHubLifetimeManager<>));
    services.TryAddSingleton(typeof(IHubProtocolResolver), typeof(DefaultHubProtocolResolver));
    services.TryAddSingleton(typeof(IHubContext<>), typeof(HubContext<>));
    services.TryAddSingleton(typeof(IHubContext<, >), typeof(HubContext<, >));
    services.TryAddSingleton(typeof(HubConnectionHandler<>), typeof(HubConnectionHandler<>));
    services.TryAddSingleton(typeof(IUserIdProvider), typeof(DefaultUserIdProvider));
    services.TryAddSingleton(typeof(HubDispatcher<>), typeof(DefaultHubDispatcher<>));
    services.TryAddScoped(typeof(IHubActivator<>), typeof(DefaultHubActivator<>));
    services.AddAuthorization();
    SignalRServerBuilder signalRServerBuilder = new SignalRServerBuilder(services);
    signalRServerBuilder.AddJsonProtocol();

    先看app使用hub的地方:

    app.UseEndpoints(endpoints =>
    {
        endpoints.MapHub<XxxHub>("/xxxHub");
    });
    

    navigation->Microsoft.AspNetCore.Builder.HubEndpointRouteBuilderExtensions.HubEndpointRouteBuilderExtensions

    public static class HubEndpointRouteBuilderExtensions
    {
        public static HubEndpointConventionBuilder MapHub<THub>(this IEndpointRouteBuilder endpoints, string pattern) where THub : Hub
        {
            return endpoints.MapHub<THub>(pattern, null);
        }
    
    public static HubEndpointConventionBuilder MapHub&lt;THub&gt;(this IEndpointRouteBuilder endpoints, string pattern, Action&lt;HttpConnectionDispatcherOptions&gt; configureOptions) where THub : Hub
    {
        if (endpoints.ServiceProvider.GetService&lt;SignalRMarkerService&gt;() == null)
        {
            throw new InvalidOperationException("Unable to find the required services. Please add all the required services by calling 'IServiceCollection.AddSignalR' inside the call to 'ConfigureServices(...)' in the application startup code.");
        }
        HttpConnectionDispatcherOptions httpConnectionDispatcherOptions = new HttpConnectionDispatcherOptions();
        configureOptions?.Invoke(httpConnectionDispatcherOptions);
        ConnectionEndpointRouteBuilder connectionEndpointRouteBuilder = endpoints.MapConnections(pattern, httpConnectionDispatcherOptions, delegate(IConnectionBuilder b)
        {
            b.UseHub&lt;THub&gt;();
        });
        object[] attributes = typeof(THub).GetCustomAttributes(inherit: true);
        connectionEndpointRouteBuilder.Add(delegate(EndpointBuilder e)
        {
            object[] array = attributes;
            foreach (object item in array)
            {
                e.Metadata.Add(item);
            }
            e.Metadata.Add(new HubMetadata(typeof(THub)));
        });
        return new HubEndpointConventionBuilder(connectionEndpointRouteBuilder);
    }
    

    }

    key code : b.UseHub<THub>();

    navigation -> Microsoft.AspNetCore.SignalR.SignalRConnectionBuilderExtensions.SignalRConnectionBuilderExtensions

    public static class SignalRConnectionBuilderExtensions
    {
        public static IConnectionBuilder UseHub<THub>(this IConnectionBuilder connectionBuilder) where THub : Hub
        {
            if (connectionBuilder.ApplicationServices.GetService(typeof(SignalRCoreMarkerService)) == null)
            {
                throw new InvalidOperationException("Unable to find the required services. Please add all the required services by calling 'IServiceCollection.AddSignalR' inside the call to 'ConfigureServices(...)' in the application startup code.");
            }
            return connectionBuilder.UseConnectionHandler<HubConnectionHandler<THub>>();
        }
    }
    

    navigation -> Microsoft.AspNetCore.Connections.ConnectionBuilderExtensions.ConnectionBuilderExtensions

    public static IConnectionBuilder UseConnectionHandler<TConnectionHandler>(this IConnectionBuilder connectionBuilder) where TConnectionHandler : ConnectionHandler
    {
        TConnectionHandler handler = ActivatorUtilities.GetServiceOrCreateInstance<TConnectionHandler>(connectionBuilder.ApplicationServices);
        return connectionBuilder.Run((ConnectionContext connection) => handler.OnConnectedAsync(connection));
    }
    

    OnConnectedAsync!!!,见名思以当连接打开时触发,这个应该就是关键点了

    navigation -> Microsoft.AspNetCore.Connections.ConnectionHandler

    public override async Task OnConnectedAsync(ConnectionContext connection)
    {
        IList<string> list = _hubOptions.SupportedProtocols ?? _globalHubOptions.SupportedProtocols;
        if (list == null || list.Count == 0)// 未配置连接协议
        {
            throw new InvalidOperationException("There are no supported protocols");
        }
        // 超时时间
        TimeSpan timeout = _hubOptions.HandshakeTimeout ?? _globalHubOptions.HandshakeTimeout ?? HubOptionsSetup.DefaultHandshakeTimeout;
    
    // 连接上下文配置
    HubConnectionContextOptions contextOptions = new HubConnectionContextOptions
    {
        KeepAliveInterval = (_hubOptions.KeepAliveInterval ?? _globalHubOptions.KeepAliveInterval ?? HubOptionsSetup.DefaultKeepAliveInterval),
        ClientTimeoutInterval = (_hubOptions.ClientTimeoutInterval ?? _globalHubOptions.ClientTimeoutInterval ?? HubOptionsSetup.DefaultClientTimeoutInterval),
        StreamBufferCapacity = (_hubOptions.StreamBufferCapacity ?? _globalHubOptions.StreamBufferCapacity ?? 10),
        MaximumReceiveMessageSize = _maximumMessageSize
    };
    Log.ConnectedStarting(_logger);
    // **** 构建连接对象
    HubConnectionContext connectionContext = new HubConnectionContext(connection, contextOptions, _loggerFactory);
    IReadOnlyList&lt;string&gt; supportedProtocols = (list as IReadOnlyList&lt;string&gt;) ?? list.ToList();
    
    // 然后进行握手连接
    if (await connectionContext.HandshakeAsync(timeout, supportedProtocols, _protocolResolver, _userIdProvider, _enableDetailedErrors))
    { // 握手成功
        try
        {
            await _lifetimeManager.OnConnectedAsync(connectionContext);
            await RunHubAsync(connectionContext);
        }
        finally
        {
            Log.ConnectedEnding(_logger);
            await _lifetimeManager.OnDisconnectedAsync(connectionContext);
        }
    }
    

    }

    主要看握手成功之后的内容:

    try
    {
        await _lifetimeManager.OnConnectedAsync(connectionContext);
        await RunHubAsync(connectionContext);
    }
    finally
    {
        Log.ConnectedEnding(_logger);
        await _lifetimeManager.OnDisconnectedAsync(connectionContext);
    }
    

    首先可以看到在finally中调用了OnDisconnectedAsync,见名思以我觉得它应该就是我们要找的释放连接,查看定义:

    private readonly HubLifetimeManager<THub> _lifetimeManager;
    

    而且通过之前的注册来看此成员是一个单例,感觉非常符合,继续查看定义: Microsoft.AspNetCore.SignalR.HubLifetimeManager

    public abstract class HubLifetimeManager<THub> where THub : Hub
    {
        public abstract Task OnConnectedAsync(HubConnectionContext connection);
    
    public abstract Task OnDisconnectedAsync(HubConnectionContext connection);
    
    public abstract Task SendAllAsync(string methodName, object[] args, CancellationToken cancellationToken = default(CancellationToken));
    
    public abstract Task SendAllExceptAsync(string methodName, object[] args, IReadOnlyList&lt;string&gt; excludedConnectionIds, CancellationToken cancellationToken = default(CancellationToken));
    
    public abstract Task SendConnectionAsync(string connectionId, string methodName, object[] args, CancellationToken cancellationToken = default(CancellationToken));
    
    public abstract Task SendConnectionsAsync(IReadOnlyList&lt;string&gt; connectionIds, string methodName, object[] args, CancellationToken cancellationToken = default(CancellationToken));
    
    public abstract Task SendGroupAsync(string groupName, string methodName, object[] args, CancellationToken cancellationToken = default(CancellationToken));
    
    public abstract Task SendGroupsAsync(IReadOnlyList&lt;string&gt; groupNames, string methodName, object[] args, CancellationToken cancellationToken = default(CancellationToken));
    
    public abstract Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList&lt;string&gt; excludedConnectionIds, CancellationToken cancellationToken = default(CancellationToken));
    
    public abstract Task SendUserAsync(string userId, string methodName, object[] args, CancellationToken cancellationToken = default(CancellationToken));
    
    public abstract Task SendUsersAsync(IReadOnlyList&lt;string&gt; userIds, string methodName, object[] args, CancellationToken cancellationToken = default(CancellationToken));
    
    public abstract Task AddToGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default(CancellationToken));
    
    public abstract Task RemoveFromGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default(CancellationToken));
    

    }

    到此有点小懵逼了???,这里的方法都是返回Task,但我释放连接需要HubConnectionContext,岂不是无解???

    虽然很像但是不是就很郁闷,既然_lifetimeManager做不了,就只能去看:

    await RunHubAsync(connectionContext);
    

    private async Task RunHubAsync(HubConnectionContext connection)
    {
    try
    {
    await _dispatcher.OnConnectedAsync(connection);
    }
    catch (Exception exception)
    {
    Log.ErrorDispatchingHubEvent(_logger, "OnConnectedAsync", exception);
    await SendCloseAsync(connection, exception, allowReconnect: false);
    return;
    }
    try
    {
    await DispatchMessagesAsync(connection);
    }
    catch (OperationCanceledException)
    {
    }
    catch (Exception exception2)
    {
    Log.ErrorProcessingRequest(_logger, exception2);
    await HubOnDisconnectedAsync(connection, exception2);
    return;
    }
    await HubOnDisconnectedAsync(connection, null);
    }

    一个一个来,先看await _dispatcher.OnConnectedAsync(connection);

    navigation -> Microsoft.AspNetCore.SignalR.Internal.DefaultHubDispatcher

    public override async Task OnConnectedAsync(HubConnectionContext connection)
    {
        IServiceScope scope = null;
        try
        {
            // 通过 service 拿到了THub
            scope = _serviceScopeFactory.CreateScope();
            IHubActivator<THub> hubActivator = scope.ServiceProvider.GetRequiredService<IHubActivator<THub>>();
            THub hub = hubActivator.Create();
            try
            {
                // 然后通过hub 和 连接进行初始化
                InitializeHub(hub, connection);
                await hub.OnConnectedAsync();
            }
            finally
            {
                hubActivator.Release(hub);
            }
        }
        finally
        {
            await scope.DisposeAsync();
        }
    }
    

    private void InitializeHub(THub hub, HubConnectionContext connection)
    {
    hub.Clients = new HubCallerClients(_hubContext.Clients, connection.ConnectionId); // 只用到了ConnectionId显然不是
    hub.Context = connection.HubCallerContext; // 这个就有点可疑了
    hub.Groups = _hubContext.Groups;// 只用到了分组应该也不是
    }

    查看HubConnectionContext的构造方法看看HubCallerContext是如何被构造的:

    public HubConnectionContext(ConnectionContext connectionContext, HubConnectionContextOptions contextOptions, ILoggerFactory loggerFactory)
    {
        ...
        HubCallerContext = new DefaultHubCallerContext(this);
        ...
    }
    

    navigation -> Microsoft.AspNetCore.SignalR.Internal.DefaultHubCallerContext

    internal class DefaultHubCallerContext : HubCallerContext
    {
        private readonly HubConnectionContext _connection;
    
    public override string ConnectionId =&gt; _connection.ConnectionId;
    
    public override string UserIdentifier =&gt; _connection.UserIdentifier;
    
    public override ClaimsPrincipal User =&gt; _connection.User;
    
    public override IDictionary&lt;object, object&gt; Items =&gt; _connection.Items;
    
    public override IFeatureCollection Features =&gt; _connection.Features;
    
    public override CancellationToken ConnectionAborted =&gt; _connection.ConnectionAborted;
    
    public DefaultHubCallerContext(HubConnectionContext connection)
    {
        _connection = connection;
    }
    
    public override void Abort()
    {
        // ************************ 
        _connection.Abort();
    }
    

    }

    Abort -> 使中止 推测是中止连接,而且通过源码可知调的是HubConnectionContext.Abort.

    Hub中的定义:

    public HubCallerContext Context
    {
        get
        {
            CheckDisposed();
            return _context;
        }
        set
        {
            CheckDisposed();
            _context = value;
        }
    }
    

    通过测试结果可知,这个便是服务器中断连接的方法了

    [Over~]

  • 相关阅读:
    PTA 天梯赛 L1
    浙江省赛真题2018
    kuangbin专题——简单搜索
    testng.xml 配置大全
    创建testng.xml文件
    TestNG.xml 配置
    Testng 简介
    testng教程之testng.xml的配置和使用,以及参数传递
    jenkins构建:通过testng.xml构建项目
    Jenkins如何集成运行testng.xml文件的解决方案
  • 原文地址:https://www.cnblogs.com/monster17/p/13537129.html
Copyright © 2011-2022 走看看