zoukankan      html  css  js  c++  java
  • 企业级工作流解决方案(八)--微服务Tcp消息传输模型之服务端处理

    服务端启动

      服务端启动主要做几件事情,1. 从配置文件读取服务配置(主要是服务监听端口和编解码配置),2. 注册编解码器工厂,3. 启动dotnetty监听端口,4. 读取配置文件,解析全局消息处理模型5. 注册服务端处理对象到容器。

      JsonRpcServerModule代码如下,见备注说明

    [DependsOn(typeof(AbpKernelModule))]
        public class JsonRpcServerModule : AbpModule
        {
            public override void PreInitialize()
            {
                // 注册客户端配置,固定从Xml文件读取
                SocketServiceConfiguration socketServiceConfiguration = XmlConfigProvider.GetConfig<SocketServiceConfiguration>("SocketServiceConfiguration.xml");
                IocManager.IocContainer.Register(
                    Component
                        .For<ISocketServiceConfiguration>()
                        .Instance(socketServiceConfiguration)
                );
                IocManager.Register<IServiceExecutor, DefaultServiceExecutor>(Dependency.DependencyLifeStyle.Singleton);
            }
    
            public override void Initialize()
            {
                IocManager.RegisterAssemblyByConvention(typeof(JsonRpcServerModule).GetAssembly());
                var socketServiceConfiguration = Configuration.Modules.RpcServiceConfig();
                switch (socketServiceConfiguration.MessageCode) //  根据配置文件,编解码配置选择
                {
                    case EMessageCode.Json:
                        IocManager.RegisterIfNot<ITransportMessageCodecFactory, JsonTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                        break;
                    case EMessageCode.MessagePack:
                        IocManager.RegisterIfNot<ITransportMessageCodecFactory, MessagePackTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                        break;
                    case EMessageCode.ProtoBuffer:
                        IocManager.RegisterIfNot<ITransportMessageCodecFactory, ProtoBufferTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                        break;
                    default:
                        break;
                }
    
                RegisterDefaultProtocol();
            }
    
            public override void PostInitialize()
            {
                var socketServiceConfiguration = IocManager.Resolve<ISocketServiceConfiguration>();
                // 方法里面调用ServiceHost构造函数传入的委托,启动dotnetty监听
                IocManager.Resolve<IServiceHost>().StartAsync(new IpAddressModel("0.0.0.0", socketServiceConfiguration.Port).CreateEndPoint());
    
                // 从配置文件读取json-rpc服务配置,解析消息处理模型
                JsonRpcRegister.LoadFromConfig(IocManager);
            }
    
            private void RegisterDefaultProtocol()
            {
                var dotNettyServerMessageListener = new DotNettyServerMessageListener(Logger,
                           IocManager.Resolve<ITransportMessageCodecFactory>(), IocManager.Resolve<ISocketServiceConfiguration>());
    
                IocManager.IocContainer.Register(
                    Component
                        .For<IMessageListener>()
                        .Instance(dotNettyServerMessageListener)
                );
    
                var serviceExecutor = IocManager.Resolve<IServiceExecutor>();
    
                // 新建一个ServiceHost对象,放入容器,这个时候dotnetty还未启动,只是定义了执行方法。
                var serverHost = new DefaultServiceHost(async endPoint =>
                {
                    await dotNettyServerMessageListener.StartAsync(endPoint); // 启动dotnetty监听
                    return dotNettyServerMessageListener;
                }, serviceExecutor);
    
                IocManager.IocContainer.Register(
                    Component
                        .For<IServiceHost>()
                        .Instance(serverHost)
                );
            }
    }

      Dotnetty启动监听代码,参考dotnetty提供的实例代码,ServerHandler为自定义消息处理Chanel

    /// <summary>
            /// 触发接收到消息事件。
            /// </summary>
            /// <param name="sender">消息发送者。</param>
            /// <param name="message">接收到的消息。</param>
            /// <returns>一个任务。</returns>
            public async Task OnReceived(IMessageSender sender, TransportMessage message)
            {
                if (Received == null)
                    return;
                await Received(sender, message);
            }
    public async Task StartAsync(EndPoint endPoint)
            {
                _logger.Debug($"准备启动服务主机,监听地址:{endPoint}。");
    
                IEventLoopGroup bossGroup = new MultithreadEventLoopGroup(1);
                IEventLoopGroup workerGroup = new MultithreadEventLoopGroup();//Default eventLoopCount is Environment.ProcessorCount * 2
                var bootstrap = new ServerBootstrap();
    
                bossGroup = new MultithreadEventLoopGroup(1);
                workerGroup = new MultithreadEventLoopGroup();
                bootstrap.Channel<TcpServerSocketChannel>();
                bootstrap
                .Option(ChannelOption.SoBacklog, _socketServiceConfiguration.Backlog)
                .ChildOption(ChannelOption.Allocator, PooledByteBufferAllocator.Default)
                .Group(bossGroup, workerGroup)
                .ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
                {
                    var pipeline = channel.Pipeline;
                    pipeline.AddLast(new LengthFieldPrepender(4));
                    pipeline.AddLast(new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4));
                    pipeline.AddLast(new TransportMessageChannelHandlerAdapter(_transportMessageDecoder));
                    pipeline.AddLast(new ServerHandler(async (contenxt, message) =>
                    {
                        var sender = new DotNettyServerMessageSender(_transportMessageEncoder, contenxt);
                        await OnReceived(sender, message);
                    }, _logger));
                }));
                try
                {
                    _channel = await bootstrap.BindAsync(endPoint);
                    _logger.Debug($"服务主机启动成功,监听地址:{endPoint}。");
                }
                catch
                {
                    _logger.Error($"服务主机启动失败,监听地址:{endPoint}。 ");
                }
            }

      消息最终经过解码处理之后,会落到DefaultServiceExecutor类进行处理,在这里调用JsonRpcProcessor静态类的Process方法,处理Json-Rpc请求,并构造答复消息,答复客户端。

    public class DefaultServiceExecutor : IServiceExecutor
        {
            private readonly ILogger _logger;
            public DefaultServiceExecutor(ILogger logger)
            {
                _logger = logger;
            }
            public async Task ExecuteAsync(IMessageSender sender, TransportMessage message)
            {
                _logger.Debug("服务提供者接收到消息");
    
                if (!message.IsInvokeMessage())
                    return;
    
                JsonRequest jsonRequest;
                try
                {
                    jsonRequest = message.GetContent<JsonRequest>();
                }
                catch (Exception exception)
                {
                    _logger.Error("将接收到的消息反序列化成 TransportMessage<JsonRequest> 时发送了错误。", exception);
                    return;
                }
    
                _logger.Debug("准备执行本地逻辑。");
    
                var resultMessage = await LocalExecuteAsync(jsonRequest, message.Headers);
    
                //向客户端发送调用结果。
                await SendRemoteInvokeResult(sender, message.Id, JsonConvert.DeserializeObject<JsonResponse>(resultMessage));
            }
    
            private async Task<string> LocalExecuteAsync(JsonRequest jsonRequest,object headers)
            {
                return await JsonRpcProcessor.Process(JsonConvert.SerializeObject(jsonRequest), headers);
            }
    
            private async Task SendRemoteInvokeResult(IMessageSender sender, string messageId, JsonResponse resultMessage)
            {
                try
                {
                    
                    _logger.Debug("准备发送响应消息。");
    
                    await sender.SendAndFlushAsync(TransportMessage.CreateInvokeResultMessage(messageId, resultMessage, new NameValueCollection()));
                    _logger.Debug("响应消息发送成功。");
                }
                catch (Exception exception)
                {
                    _logger.Error("发送响应消息时候发生了异常。", exception);
                }
            }
        }

    这部分内容没有太多的说明,参见surging

  • 相关阅读:
    服务管理--systemctl命令
    dd if=/dev/zero of=的含义是什么?Linux 下的dd命令使用详解
    QML与Qt C++ 交互机制探讨与总结
    sync命令
    linux 下shell中if的“-e,-d,-f”是什么意思
    POJ 1936 All in All(模拟)
    POJ 1088 滑雪(记忆化搜索)
    POJ 3280 Cheapest Palindrome(DP 回文变形)
    POJ 3181 Dollar Dayz(高精度 动态规划)
    HDU 1114 Piggy-Bank(完全背包)
  • 原文地址:https://www.cnblogs.com/spritekuang/p/10805768.html
Copyright © 2011-2022 走看看