zoukankan      html  css  js  c++  java
  • 企业级工作流解决方案(九)--微服务Tcp消息传输模型之客户端处理

    客户端启动

      客户端启动主要做三件事情,1. 从配置文件读取服务调用配置,存储到全局对象中。2. 指定客户端编解码器工厂。3. 预连接,即预先建立与服务端的通信Chanel。

    [DependsOn(typeof(AbpKernelModule))]
        public class JsonRpcClientModule : AbpModule
        {
            public override void PreInitialize()
            {
                // 注册客户端配置,固定从Xml文件读取
                SocketClientConfiguration socketClientConfiguration = XmlConfigProvider.GetConfig<SocketClientConfiguration>("SocketClientConfiguration.xml");
                IocManager.IocContainer.Register(
                    Component
                        .For<ISocketClientConfiguration>()
                        .Instance(socketClientConfiguration)
                );
                switch (socketClientConfiguration.MessageCode)
                {
                    case EMessageCode.Json:
                        IocManager.RegisterIfNot<ITransportMessageCodecFactory, JsonTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                        break;
                    case EMessageCode.MessagePack:
                        IocManager.RegisterIfNot<ITransportMessageCodecFactory, MessagePackTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                        break;
                    case EMessageCode.ProtoBuffer:
                        IocManager.RegisterIfNot<ITransportMessageCodecFactory, ProtoBufferTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                        break;
                }
            }
    
            public override void Initialize()
            {
                IocManager.RegisterAssemblyByConvention(typeof(JsonRpcClientModule).GetAssembly());
                var dotNettyTransportClientFactory = new DotNettyTransportClientFactory(IocManager.Resolve<ITransportMessageCodecFactory>(), Logger);
    
                IocManager.IocContainer.Register(
                    Component
                        .For<ITransportClientFactory>()
                        .Instance(dotNettyTransportClientFactory)
                );
            }
    
            public override void PostInitialize()
            {
                var socketClientConfiguration = Configuration.Modules.RpcClientConfig();
                var transportClientFactory = IocManager.Resolve<ITransportClientFactory>();
                try
                {
                    foreach (var clientConnectServerInfo in socketClientConfiguration.ClientConnectServerInfos) // 预连接
                    {
                        if (clientConnectServerInfo.ConnectServerType == EConnectServerType.Tcp)
                        {
                            var tcpAddress = clientConnectServerInfo.Url.Split(new char[] { ':' }, StringSplitOptions.RemoveEmptyEntries);
                            transportClientFactory.CreateClient(new IpAddressModel(tcpAddress[0], int.Parse(tcpAddress[1])).CreateEndPoint());
                        }
                    }
                }
                catch(Exception ex) // 预连,出错不处理
                {
    
                }
            }
        }

    客户端全局Chanel设计

      每一个服务连接创建一个TransportClient与之对应,存储在全局变量中private readonly ConcurrentDictionary<EndPoint, Lazy<ITransportClient>> _clients = new ConcurrentDictionary<EndPoint, Lazy<ITransportClient>>();

      TransportClient即处理客户端传输消息对象,每当发起客户端调用时,创建transportClient对象,并存储到_clients集合中,下次对同一个服务端调用时,直接复用此对象,如果与服务器的通信Chanel断开,则从_clients对象中移除,达到了复用Chanel的作用。

    /// <summary>
            /// 创建客户端。
            /// </summary>
            /// <param name="endPoint">终结点。</param>
            /// <returns>传输客户端实例。</returns>
            public ITransportClient CreateClient(EndPoint endPoint)
            {
                var key = endPoint;
                _logger.Debug($"准备为服务端地址:{key}创建客户端。");
                try
                {
                    return _clients.GetOrAdd(key
                        , k => new Lazy<ITransportClient>(() =>
                        {
                            var bootstrap = _bootstrap;
                            var channel = bootstrap.ConnectAsync(k).Result;
                            var messageListener = new MessageListener();
                            channel.GetAttribute(messageListenerKey).Set(messageListener);
                            var messageSender = new DotNettyMessageClientSender(_transportMessageEncoder, channel);
                            channel.GetAttribute(messageSenderKey).Set(messageSender);
                            channel.GetAttribute(origEndPointKey).Set(k);
                            var client = new TransportClient(messageSender, messageListener, _logger);
                            return client;
                        }
                        )).Value;
                }
                catch
                {
                    _clients.TryRemove(key, out var value);
                    var ipEndPoint = endPoint as IPEndPoint;
                    throw;
                }
            }
    protected class DefaultChannelHandler : ChannelHandlerAdapter
            {
              
                public override void ChannelInactive(IChannelHandlerContext context)
                {
                    _factory._clients.TryRemove(context.Channel.GetAttribute(origEndPointKey).Get(), out var value);
                }
            }

    TransportClient

      默认的客户端传输实现,Rpc调用时,直接组装请求参数,调用SendAsync方法。注意里面的ManualResetValueTaskSource的设计。

    /// <summary>
        /// 一个默认的传输客户端实现。
        /// </summary>
        public class TransportClient : ITransportClient, IDisposable
        {
            #region Field
    
            private readonly IMessageSender _messageSender;
            private readonly IMessageListener _messageListener;
            private readonly ILogger _logger;
    
            private readonly ConcurrentDictionary<string, ManualResetValueTaskSource<TransportMessage>> _resultDictionary =
                new ConcurrentDictionary<string, ManualResetValueTaskSource<TransportMessage>>();
    
            #endregion Field
    
            #region Constructor
    
            public TransportClient(IMessageSender messageSender, IMessageListener messageListener, ILogger logger)
            {
                _messageSender = messageSender;
                _messageListener = messageListener;
                _logger = logger;
                messageListener.Received += MessageListener_Received;
            }
    
            #endregion Constructor
    
            #region Implementation of ITransportClient
    
            /// <summary>
            /// 发送消息。
            /// </summary>
            /// <param name="message">远程调用消息模型。</param>
            /// <returns>远程调用消息的传输消息。</returns>
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
            public async Task<JsonResponse> SendAsync(JsonRequest message, NameValueCollection contextNameValueCollection, CancellationToken cancellationToken)
            {
                try
                {
                    _logger.Debug("准备发送消息。");
    
                    var transportMessage = TransportMessage.CreateInvokeMessage(message, contextNameValueCollection);
    
                    //注册结果回调
                    var callbackTask = RegisterResultCallbackAsync(transportMessage.Id, cancellationToken);
    
                    try
                    {
                        //发送
                        await _messageSender.SendAndFlushAsync(transportMessage);
                    }
                    catch (Exception exception)
                    {
                        throw new CommunicationException("与服务端通讯时发生了异常。", exception);
                    }
    
                    _logger.Debug("消息发送成功。");
    
                    return await callbackTask;
                }
                catch (Exception exception)
                {
                    _logger.Error("消息发送失败。");
                    throw;
                }
            }
    
            #endregion Implementation of ITransportClient
    
            #region Implementation of IDisposable
    
            /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
            public void Dispose()
            {
                (_messageSender as IDisposable)?.Dispose();
                (_messageListener as IDisposable)?.Dispose();
                foreach (var taskCompletionSource in _resultDictionary.Values)
                {
                    taskCompletionSource.SetCanceled();
                }
            }
    
            #endregion Implementation of IDisposable
    
            #region Private Method
    
            /// <summary>
            /// 注册指定消息的回调任务。
            /// </summary>
            /// <param name="id">消息Id。</param>
            /// <returns>远程调用结果消息模型。</returns>
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
            private async Task<JsonResponse> RegisterResultCallbackAsync(string id, CancellationToken cancellationToken)
            {
                _logger.Debug($"准备获取Id为:{id}的响应内容。");
    
                var task = new ManualResetValueTaskSource<TransportMessage>();
                _resultDictionary.TryAdd(id, task);
                try
                {
                    var result = await task.AwaitValue(cancellationToken);
                    return result.GetContent<JsonResponse>();
                }
                finally
                {
                    //删除回调任务
                    ManualResetValueTaskSource<TransportMessage> value;
                    _resultDictionary.TryRemove(id, out value);
                    value.SetCanceled();
                }
            }
    
            private async Task MessageListener_Received(IMessageSender sender, TransportMessage message)
            {
                _logger.Debug("服务消费者接收到消息。");
    
                ManualResetValueTaskSource<TransportMessage> task;
                if (!_resultDictionary.TryGetValue(message.Id, out task))
                    return;
    
                if (message.IsInvokeResultMessage())
                {
                    var content = message.GetContent<JsonResponse>();
                    if (content.Error != null)
                    {
                        task.SetException(content.Error);
                    }
                    else
                    {
                        task.SetResult(message);
                    }
                }
            }
    
            #endregion Private Method
        }
  • 相关阅读:
    springboot 上传文件过大的500异常
    java OSS批量下载,并压缩为ZIP
    Java 对象转xml (dom 4j)
    windows 10 64位机器上 安装部署
    Java 读取excel 文件流
    关于Java 去除空格,换行的代码
    ORACLE 查询近一天,近半小时内的数据
    【转】C#(ASP.Net)获取当前路径的方法集合
    【转】NumPy-快速处理数据
    【转】Eclipse 常用快捷键 (动画讲解)
  • 原文地址:https://www.cnblogs.com/spritekuang/p/10805780.html
Copyright © 2011-2022 走看看