zoukankan      html  css  js  c++  java
  • SignalR 如何借助redis 实现跨进程通信

    关于redis的订阅和发布功能,这里讲到比较好https://redisbook.readthedocs.io/en/latest/feature/pubsub.html

    signalr 实际上就是使用了Redis内置的发布订阅功能来同步不同服务器上的客户端信息。

    每个连接有一个connectionId,连接成功的时候就会订阅这个connectionId + 回调,
    send的实质是会发布一下这个connectionId。
    这样 即使是在其他进程上send ,也会通过redis的订阅机制,传信息给订阅者,然后执行回调操作。
    这样就轻松实现了跨进程通讯。

    具体可以参考源码:

            public override async Task OnConnectedAsync(HubConnectionContext connection)
            {
                await EnsureRedisServerConnection();
                var feature = new RedisFeature();
                connection.Features.Set<IRedisFeature>(feature);
    
                var connectionTask = Task.CompletedTask;
                var userTask = Task.CompletedTask;
    
                _connections.Add(connection);
    
                connectionTask = SubscribeToConnection(connection);
    
                if (!string.IsNullOrEmpty(connection.UserIdentifier))
                {
                    userTask = SubscribeToUser(connection);
                }
    
                await Task.WhenAll(connectionTask, userTask);
            }
    

      

            public override Task SendConnectionAsync(string connectionId, string methodName, object[] args, CancellationToken cancellationToken = default)
            {
                if (connectionId == null)
                {
                    throw new ArgumentNullException(nameof(connectionId));
                }
    
                // If the connection is local we can skip sending the message through the bus since we require sticky connections.
                // This also saves serializing and deserializing the message!
                var connection = _connections[connectionId];
                if (connection != null)
                {
                    return connection.WriteAsync(new InvocationMessage(methodName, args)).AsTask();
                }
    
                var message = _protocol.WriteInvocation(methodName, args);
                return PublishAsync(_channels.Connection(connectionId), message);
            }
    

      

    // Copyright (c) .NET Foundation. All rights reserved.
    // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
    
    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.AspNetCore.SignalR.Protocol;
    using Microsoft.AspNetCore.SignalR.Redis.Internal;
    using Microsoft.Extensions.Logging;
    using Microsoft.Extensions.Options;
    using StackExchange.Redis;
    
    namespace Microsoft.AspNetCore.SignalR.Redis
    {
        public class RedisHubLifetimeManager<THub> : HubLifetimeManager<THub>, IDisposable where THub : Hub
        {
            private readonly HubConnectionStore _connections = new HubConnectionStore();
            private readonly RedisSubscriptionManager _groups = new RedisSubscriptionManager();
            private readonly RedisSubscriptionManager _users = new RedisSubscriptionManager();
            private IConnectionMultiplexer _redisServerConnection;
            private ISubscriber _bus;
            private readonly ILogger _logger;
            private readonly RedisOptions _options;
            private readonly RedisChannels _channels;
            private readonly string _serverName = GenerateServerName();
            private readonly RedisProtocol _protocol;
            private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(1);
    
            private readonly AckHandler _ackHandler;
            private int _internalId;
    
            public RedisHubLifetimeManager(ILogger<RedisHubLifetimeManager<THub>> logger,
                                           IOptions<RedisOptions> options,
                                           IHubProtocolResolver hubProtocolResolver)
            {
                _logger = logger;
                _options = options.Value;
                _ackHandler = new AckHandler();
                _channels = new RedisChannels(typeof(THub).FullName);
                _protocol = new RedisProtocol(hubProtocolResolver.AllProtocols);
    
                RedisLog.ConnectingToEndpoints(_logger, options.Value.Configuration.EndPoints, _serverName);
                _ = EnsureRedisServerConnection();
            }
    
            public override async Task OnConnectedAsync(HubConnectionContext connection)
            {
                await EnsureRedisServerConnection();
                var feature = new RedisFeature();
                connection.Features.Set<IRedisFeature>(feature);
    
                var connectionTask = Task.CompletedTask;
                var userTask = Task.CompletedTask;
    
                _connections.Add(connection);
    
                connectionTask = SubscribeToConnection(connection);
    
                if (!string.IsNullOrEmpty(connection.UserIdentifier))
                {
                    userTask = SubscribeToUser(connection);
                }
    
                await Task.WhenAll(connectionTask, userTask);
            }
    
            public override Task OnDisconnectedAsync(HubConnectionContext connection)
            {
                _connections.Remove(connection);
    
                var tasks = new List<Task>();
    
                var connectionChannel = _channels.Connection(connection.ConnectionId);
                RedisLog.Unsubscribe(_logger, connectionChannel);
                tasks.Add(_bus.UnsubscribeAsync(connectionChannel));
    
                var feature = connection.Features.Get<IRedisFeature>();
                var groupNames = feature.Groups;
    
                if (groupNames != null)
                {
                    // Copy the groups to an array here because they get removed from this collection
                    // in RemoveFromGroupAsync
                    foreach (var group in groupNames.ToArray())
                    {
                        // Use RemoveGroupAsyncCore because the connection is local and we don't want to
                        // accidentally go to other servers with our remove request.
                        tasks.Add(RemoveGroupAsyncCore(connection, group));
                    }
                }
    
                if (!string.IsNullOrEmpty(connection.UserIdentifier))
                {
                    tasks.Add(RemoveUserAsync(connection));
                }
    
                return Task.WhenAll(tasks);
            }
    
            public override Task SendAllAsync(string methodName, object[] args, CancellationToken cancellationToken = default)
            {
                var message = _protocol.WriteInvocation(methodName, args);
                return PublishAsync(_channels.All, message);
            }
    
            public override Task SendAllExceptAsync(string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds, CancellationToken cancellationToken = default)
            {
                var message = _protocol.WriteInvocation(methodName, args, excludedConnectionIds);
                return PublishAsync(_channels.All, message);
            }
    
            public override Task SendConnectionAsync(string connectionId, string methodName, object[] args, CancellationToken cancellationToken = default)
            {
                if (connectionId == null)
                {
                    throw new ArgumentNullException(nameof(connectionId));
                }
    
                // If the connection is local we can skip sending the message through the bus since we require sticky connections.
                // This also saves serializing and deserializing the message!
                var connection = _connections[connectionId];
                if (connection != null)
                {
                    return connection.WriteAsync(new InvocationMessage(methodName, args)).AsTask();
                }
    
                var message = _protocol.WriteInvocation(methodName, args);
                return PublishAsync(_channels.Connection(connectionId), message);
            }
    
            public override Task SendGroupAsync(string groupName, string methodName, object[] args, CancellationToken cancellationToken = default)
            {
                if (groupName == null)
                {
                    throw new ArgumentNullException(nameof(groupName));
                }
    
                var message = _protocol.WriteInvocation(methodName, args);
                return PublishAsync(_channels.Group(groupName), message);
            }
    
            public override Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList<string> excludedConnectionIds, CancellationToken cancellationToken = default)
            {
                if (groupName == null)
                {
                    throw new ArgumentNullException(nameof(groupName));
                }
    
                var message = _protocol.WriteInvocation(methodName, args, excludedConnectionIds);
                return PublishAsync(_channels.Group(groupName), message);
            }
    
            public override Task SendUserAsync(string userId, string methodName, object[] args, CancellationToken cancellationToken = default)
            {
                var message = _protocol.WriteInvocation(methodName, args);
                return PublishAsync(_channels.User(userId), message);
            }
    
            public override Task AddToGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default)
            {
                if (connectionId == null)
                {
                    throw new ArgumentNullException(nameof(connectionId));
                }
    
                if (groupName == null)
                {
                    throw new ArgumentNullException(nameof(groupName));
                }
    
                var connection = _connections[connectionId];
                if (connection != null)
                {
                    // short circuit if connection is on this server
                    return AddGroupAsyncCore(connection, groupName);
                }
    
                return SendGroupActionAndWaitForAck(connectionId, groupName, GroupAction.Add);
            }
    
            public override Task RemoveFromGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default)
            {
                if (connectionId == null)
                {
                    throw new ArgumentNullException(nameof(connectionId));
                }
    
                if (groupName == null)
                {
                    throw new ArgumentNullException(nameof(groupName));
                }
    
                var connection = _connections[connectionId];
                if (connection != null)
                {
                    // short circuit if connection is on this server
                    return RemoveGroupAsyncCore(connection, groupName);
                }
    
                return SendGroupActionAndWaitForAck(connectionId, groupName, GroupAction.Remove);
            }
    
            public override Task SendConnectionsAsync(IReadOnlyList<string> connectionIds, string methodName, object[] args, CancellationToken cancellationToken = default)
            {
                if (connectionIds == null)
                {
                    throw new ArgumentNullException(nameof(connectionIds));
                }
    
                var publishTasks = new List<Task>(connectionIds.Count);
                var payload = _protocol.WriteInvocation(methodName, args);
    
                foreach (var connectionId in connectionIds)
                {
                    publishTasks.Add(PublishAsync(_channels.Connection(connectionId), payload));
                }
    
                return Task.WhenAll(publishTasks);
            }
    
            public override Task SendGroupsAsync(IReadOnlyList<string> groupNames, string methodName, object[] args, CancellationToken cancellationToken = default)
            {
                if (groupNames == null)
                {
                    throw new ArgumentNullException(nameof(groupNames));
                }
                var publishTasks = new List<Task>(groupNames.Count);
                var payload = _protocol.WriteInvocation(methodName, args);
    
                foreach (var groupName in groupNames)
                {
                    if (!string.IsNullOrEmpty(groupName))
                    {
                        publishTasks.Add(PublishAsync(_channels.Group(groupName), payload));
                    }
                }
    
                return Task.WhenAll(publishTasks);
            }
    
            public override Task SendUsersAsync(IReadOnlyList<string> userIds, string methodName, object[] args, CancellationToken cancellationToken = default)
            {
                if (userIds.Count > 0)
                {
                    var payload = _protocol.WriteInvocation(methodName, args);
                    var publishTasks = new List<Task>(userIds.Count);
                    foreach (var userId in userIds)
                    {
                        if (!string.IsNullOrEmpty(userId))
                        {
                            publishTasks.Add(PublishAsync(_channels.User(userId), payload));
                        }
                    }
    
                    return Task.WhenAll(publishTasks);
                }
    
                return Task.CompletedTask;
            }
    
            private async Task PublishAsync(string channel, byte[] payload)
            {
                await EnsureRedisServerConnection();
                RedisLog.PublishToChannel(_logger, channel);
                await _bus.PublishAsync(channel, payload);
            }
    
            private Task AddGroupAsyncCore(HubConnectionContext connection, string groupName)
            {
                var feature = connection.Features.Get<IRedisFeature>();
                var groupNames = feature.Groups;
    
                lock (groupNames)
                {
                    // Connection already in group
                    if (!groupNames.Add(groupName))
                    {
                        return Task.CompletedTask;
                    }
                }
    
                var groupChannel = _channels.Group(groupName);
                return _groups.AddSubscriptionAsync(groupChannel, connection, SubscribeToGroupAsync);
            }
    
            /// <summary>
            /// This takes <see cref="HubConnectionContext"/> because we want to remove the connection from the
            /// _connections list in OnDisconnectedAsync and still be able to remove groups with this method.
            /// </summary>
            private async Task RemoveGroupAsyncCore(HubConnectionContext connection, string groupName)
            {
                var groupChannel = _channels.Group(groupName);
    
                await _groups.RemoveSubscriptionAsync(groupChannel, connection, channelName =>
                {
                    RedisLog.Unsubscribe(_logger, channelName);
                    return _bus.UnsubscribeAsync(channelName);
                });
    
                var feature = connection.Features.Get<IRedisFeature>();
                var groupNames = feature.Groups;
                if (groupNames != null)
                {
                    lock (groupNames)
                    {
                        groupNames.Remove(groupName);
                    }
                }
            }
    
            private async Task SendGroupActionAndWaitForAck(string connectionId, string groupName, GroupAction action)
            {
                var id = Interlocked.Increment(ref _internalId);
                var ack = _ackHandler.CreateAck(id);
                // Send Add/Remove Group to other servers and wait for an ack or timeout
                var message = _protocol.WriteGroupCommand(new RedisGroupCommand(id, _serverName, action, groupName, connectionId));
                await PublishAsync(_channels.GroupManagement, message);
    
                await ack;
            }
    
            private Task RemoveUserAsync(HubConnectionContext connection)
            {
                var userChannel = _channels.User(connection.UserIdentifier);
    
                return _users.RemoveSubscriptionAsync(userChannel, connection, channelName =>
                {
                    RedisLog.Unsubscribe(_logger, channelName);
                    return _bus.UnsubscribeAsync(channelName);
                });
            }
    
            public void Dispose()
            {
                _bus?.UnsubscribeAll();
                _redisServerConnection?.Dispose();
                _ackHandler.Dispose();
            }
    
            private Task SubscribeToAll()
            {
                RedisLog.Subscribing(_logger, _channels.All);
                return _bus.SubscribeAsync(_channels.All, async (c, data) =>
                {
                    try
                    {
                        RedisLog.ReceivedFromChannel(_logger, _channels.All);
    
                        var invocation = _protocol.ReadInvocation((byte[])data);
    
                        var tasks = new List<Task>(_connections.Count);
    
                        foreach (var connection in _connections)
                        {
                            if (invocation.ExcludedConnectionIds == null || !invocation.ExcludedConnectionIds.Contains(connection.ConnectionId))
                            {
                                tasks.Add(connection.WriteAsync(invocation.Message).AsTask());
                            }
                        }
    
                        await Task.WhenAll(tasks);
                    }
                    catch (Exception ex)
                    {
                        RedisLog.FailedWritingMessage(_logger, ex);
                    }
                });
            }
    
            private Task SubscribeToGroupManagementChannel()
            {
                return _bus.SubscribeAsync(_channels.GroupManagement, async (c, data) =>
                {
                    try
                    {
                        var groupMessage = _protocol.ReadGroupCommand((byte[])data);
    
                        var connection = _connections[groupMessage.ConnectionId];
                        if (connection == null)
                        {
                            // user not on this server
                            return;
                        }
    
                        if (groupMessage.Action == GroupAction.Remove)
                        {
                            await RemoveGroupAsyncCore(connection, groupMessage.GroupName);
                        }
    
                        if (groupMessage.Action == GroupAction.Add)
                        {
                            await AddGroupAsyncCore(connection, groupMessage.GroupName);
                        }
    
                        // Send an ack to the server that sent the original command.
                        await PublishAsync(_channels.Ack(groupMessage.ServerName), _protocol.WriteAck(groupMessage.Id));
                    }
                    catch (Exception ex)
                    {
                        RedisLog.InternalMessageFailed(_logger, ex);
                    }
                });
            }
    
            private Task SubscribeToAckChannel()
            {
                // Create server specific channel in order to send an ack to a single server
                return _bus.SubscribeAsync(_channels.Ack(_serverName), (c, data) =>
                {
                    var ackId = _protocol.ReadAck((byte[])data);
    
                    _ackHandler.TriggerAck(ackId);
                });
            }
    
            private Task SubscribeToConnection(HubConnectionContext connection)
            {
                var connectionChannel = _channels.Connection(connection.ConnectionId);
    
                RedisLog.Subscribing(_logger, connectionChannel);
                return _bus.SubscribeAsync(connectionChannel, async (c, data) =>
                {
                    var invocation = _protocol.ReadInvocation((byte[])data);
                    await connection.WriteAsync(invocation.Message);
                });
            }
    
            private Task SubscribeToUser(HubConnectionContext connection)
            {
                var userChannel = _channels.User(connection.UserIdentifier);
    
                return _users.AddSubscriptionAsync(userChannel, connection, (channelName, subscriptions) =>
                {
                    RedisLog.Subscribing(_logger, channelName);
                    return _bus.SubscribeAsync(channelName, async (c, data) =>
                    {
                        try
                        {
                            var invocation = _protocol.ReadInvocation((byte[])data);
    
                            var tasks = new List<Task>();
                            foreach (var userConnection in subscriptions)
                            {
                                tasks.Add(userConnection.WriteAsync(invocation.Message).AsTask());
                            }
    
                            await Task.WhenAll(tasks);
                        }
                        catch (Exception ex)
                        {
                            RedisLog.FailedWritingMessage(_logger, ex);
                        }
                    });
                });
            }
    
            private Task SubscribeToGroupAsync(string groupChannel, HubConnectionStore groupConnections)
            {
                RedisLog.Subscribing(_logger, groupChannel);
                return _bus.SubscribeAsync(groupChannel, async (c, data) =>
                {
                    try
                    {
                        var invocation = _protocol.ReadInvocation((byte[])data);
    
                        var tasks = new List<Task>();
                        foreach (var groupConnection in groupConnections)
                        {
                            if (invocation.ExcludedConnectionIds?.Contains(groupConnection.ConnectionId) == true)
                            {
                                continue;
                            }
    
                            tasks.Add(groupConnection.WriteAsync(invocation.Message).AsTask());
                        }
    
                        await Task.WhenAll(tasks);
                    }
                    catch (Exception ex)
                    {
                        RedisLog.FailedWritingMessage(_logger, ex);
                    }
                });
            }
    
            private async Task EnsureRedisServerConnection()
            {
                if (_redisServerConnection == null)
                {
                    await _connectionLock.WaitAsync();
                    try
                    {
                        if (_redisServerConnection == null)
                        {
                            var writer = new LoggerTextWriter(_logger);
                            _redisServerConnection = await _options.ConnectAsync(writer);
                            _bus = _redisServerConnection.GetSubscriber();
    
                            _redisServerConnection.ConnectionRestored += (_, e) =>
                            {
                                // We use the subscription connection type
                                // Ignore messages from the interactive connection (avoids duplicates)
                                if (e.ConnectionType == ConnectionType.Interactive)
                                {
                                    return;
                                }
    
                                RedisLog.ConnectionRestored(_logger);
                            };
    
                            _redisServerConnection.ConnectionFailed += (_, e) =>
                            {
                                // We use the subscription connection type
                                // Ignore messages from the interactive connection (avoids duplicates)
                                if (e.ConnectionType == ConnectionType.Interactive)
                                {
                                    return;
                                }
    
                                RedisLog.ConnectionFailed(_logger, e.Exception);
                            };
    
                            if (_redisServerConnection.IsConnected)
                            {
                                RedisLog.Connected(_logger);
                            }
                            else
                            {
                                RedisLog.NotConnected(_logger);
                            }
    
                            await SubscribeToAll();
                            await SubscribeToGroupManagementChannel();
                            await SubscribeToAckChannel();
                        }
                    }
                    finally
                    {
                        _connectionLock.Release();
                    }
                }
            }
    
            private static string GenerateServerName()
            {
                // Use the machine name for convenient diagnostics, but add a guid to make it unique.
                // Example: MyServerName_02db60e5fab243b890a847fa5c4dcb29
                return $"{Environment.MachineName}_{Guid.NewGuid():N}";
            }
    
            private class LoggerTextWriter : TextWriter
            {
                private readonly ILogger _logger;
    
                public LoggerTextWriter(ILogger logger)
                {
                    _logger = logger;
                }
    
                public override Encoding Encoding => Encoding.UTF8;
    
                public override void Write(char value)
                {
    
                }
    
                public override void WriteLine(string value)
                {
                    RedisLog.ConnectionMultiplexerMessage(_logger, value);
                }
            }
    
            private interface IRedisFeature
            {
                HashSet<string> Groups { get; }
            }
    
            private class RedisFeature : IRedisFeature
            {
                public HashSet<string> Groups { get; } = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
            }
        }
    }
    

      

  • 相关阅读:
    http请求类型简介
    关于PLSQL连接报错:ORA-12154:TNS:无法解析指定的连接标识符
    JDK8的安装与配置
    今天新装tomcat遇到黑窗口(startup.bat)启动乱码问题解决!!!
    一个简单的工厂模式(一个接口,多个实现,通过调用条件的不同,分别去调用符合的实现)
    数组(复习)
    java选择结构、循环结构(复习)
    java常用的数据类型,变量和常量,运算符(复习)
    java输入输出,书写规范,运行原理,跨平台原理(复习)
    JAVA基础入门(JDK、eclipse下载安装)
  • 原文地址:https://www.cnblogs.com/zendu/p/13898869.html
Copyright © 2011-2022 走看看