zoukankan      html  css  js  c++  java
  • 企业级工作流解决方案(五)--微服务消息处理模型之客户端端处理

      微服务的服务端已经启动起来了,服务消费者怎么知道服务在哪个地方,通过什么方式调用呢,分布式如何选择正确的服务器调用服务?

      这个就涉及到服务发现、服务健康检查的问题了,很多微服务架构的做法都是通过消息队列来实现的,消息队列天生就支持发布订阅功能,服务有变化之后,发布通知,每个消费者更新状态,还涉及到更新服务的metadata信息,同时还涉及到服务健康检查等等一系列功能,其实这些地方是非常容易出问题的地方,但是对于规模流量不是特别巨大的企业,这部分职责可以进行转移,服务的发现就直接通过配置文件实现,服务的寻址和健康检查就通过nginx来实现。等企业足够强大的时候,再来补全这部分内容,但对于微服务Rpc调用过程来说,没有任何影响。

    客户端配置

      客户端配置文件,格式如下:

    <!--MessageCode(Json = 1,MessagePack = 2,ProtoBuffer = 3)-->
    <SocketClientConfiguration MessageCode="MessagePack">
      <ClientConnectServerInfos>
        <!--Url(Http连接服务方式为Url地址,Tcp连接方式为:IP:Port)、ConnectServerType(Tcp = 1,Http = 2,Local = 3)、ServerName(服务名称)-->
        <!--<ClientConnectServerInfo ServerName="AuthCenterService" ConnectServerType="Tcp" Url="127.0.0.1:1314"></ClientConnectServerInfo>-->
        <ClientConnectServerInfo ServerName="AuthCenterService" ConnectServerType="Http" Url="http://localhost:9527/json.rpc"></ClientConnectServerInfo>
        <ClientConnectServerInfo ServerName="FlowDefineService" ConnectServerType="Tcp" Url="127.0.0.1:2019"></ClientConnectServerInfo>
        <ClientConnectServerInfo ServerName="WorkflowRuntimeService" ConnectServerType="Tcp" Url="127.0.0.1:2019"></ClientConnectServerInfo>
      </ClientConnectServerInfos>
    </SocketClientConfiguration>

      ServerName即服务名称,与之前文章介绍的服务名称必须一致,ConnectServerType即传输方式,如果传输方式为Local的,可以不在配置文件里面配置,系统自动在本地服务中查找服务调用,Url为服务地址,Tcp方式为”IP地址:端口”,客户端启动的时候,会读取配置文件,存储到全局配置里面,代码如下:

    [DependsOn(typeof(AbpKernelModule))]
        public class JsonRpcClientModule : AbpModule
        {
            public override void PreInitialize()
            {
                // 注册客户端配置,固定从Xml文件读取
                SocketClientConfiguration socketClientConfiguration = XmlConfigProvider.GetConfig<SocketClientConfiguration>("SocketClientConfiguration.xml");
                IocManager.IocContainer.Register(
                    Component
                        .For<ISocketClientConfiguration>()
                        .Instance(socketClientConfiguration)
                );
                switch (socketClientConfiguration.MessageCode)
                {
                    case EMessageCode.Json:
                        IocManager.RegisterIfNot<ITransportMessageCodecFactory, JsonTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                        break;
                    case EMessageCode.MessagePack:
                        IocManager.RegisterIfNot<ITransportMessageCodecFactory, MessagePackTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                        break;
                    case EMessageCode.ProtoBuffer:
                        IocManager.RegisterIfNot<ITransportMessageCodecFactory, ProtoBufferTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                        break;
                }
            }
    
            public override void Initialize()
            {
                IocManager.RegisterAssemblyByConvention(typeof(JsonRpcClientModule).GetAssembly());
                var dotNettyTransportClientFactory = new DotNettyTransportClientFactory(IocManager.Resolve<ITransportMessageCodecFactory>(), Logger);
    
                IocManager.IocContainer.Register(
                    Component
                        .For<ITransportClientFactory>()
                        .Instance(dotNettyTransportClientFactory)
                );
            }
    
            public override void PostInitialize()
            {
                var socketClientConfiguration = Configuration.Modules.RpcClientConfig();
                var transportClientFactory = IocManager.Resolve<ITransportClientFactory>();
                try
                {
                    foreach (var clientConnectServerInfo in socketClientConfiguration.ClientConnectServerInfos) // 预连接
                    {
                        if (clientConnectServerInfo.ConnectServerType == EConnectServerType.Tcp)
                        {
                            var tcpAddress = clientConnectServerInfo.Url.Split(new char[] { ':' }, StringSplitOptions.RemoveEmptyEntries);
                            transportClientFactory.CreateClient(new IpAddressModel(tcpAddress[0], int.Parse(tcpAddress[1])).CreateEndPoint());
                        }
                    }
                }
                catch(Exception ex) // 预连,出错不处理
                {
    
                }
            }
        }

    客户端调用

      客户端调用示例:var loginResult = Rpc.Call<AuthenticateResultModel>("AuthCenterService.AuthCenterServiceAppService.Authenticate", userId, password);

      代码实现如下:

    /// <summary>
            /// 调用JSON-RPC服务(服务地址需要配置在App.config中的Setting下的JsonRpcServiceUrl的Key值)。
            /// </summary>
            /// <typeparam name="T">JSON-RPC方法返回的Result的对象类型。</typeparam>
            /// <param name="method">JSON-RPC方法名。</param>
            /// <param name="args">JSON-RPC方法接收的参数,此参数为可变数组</param>
            /// <returns></returns>
            public static T Call<T>(string method, params object[] args)
            {
                var rpcInvoker = GetRpcInvoker(method, null);
                var jresp = rpcInvoker.Invoke<T>(method, args);
                if (jresp.Error != null)
                    throw jresp.Error;
    
                return jresp.Result;
            }
    
    private static RpcInvoker GetRpcInvoker(string method, RpcOption option)
            {
                var socketClientConfiguration = Dependency.IocManager.Instance.Resolve<ISocketClientConfiguration>();
                var methodInfos = method.Split(new char[] { '.' });
                var callConfiguration = socketClientConfiguration.ClientConnectServerInfos.FirstOrDefault(r => r.ServerName == methodInfos[0]);
    
                RpcInvoker invoker = null;
                if(callConfiguration == null)
                {
                    invoker = new LocalRpcInvoker();
                }
                else
                {
                    switch (callConfiguration.ConnectServerType)
                    {
                        case EConnectServerType.Tcp:
                            invoker = new TcpRpcInvoker();
                            break;
                        case EConnectServerType.Http:
                            invoker = new HttpRpcInvoker();
                            break;
                        case EConnectServerType.Local:
                            invoker = new LocalRpcInvoker();
                            break;
                        default:
                            break;
                    }
                    invoker.ServiceAddress = callConfiguration.Url;
                }
    
                invoker.Option = option;
                return invoker;
            }
        }

      三种传输方式的Invoker处理逻辑有所不同,LocalInvoker直接调用服务端处理程序处理,HttpInvoker则构造Http请求,TcpInvoker则发起Tcp调用过程传输消息,三种过程都需要处理请求Header。

    internal class LocalRpcInvoker : RpcInvoker
        {
            internal override JsonResponse<T> Invoke<T>(JsonRequest jsonRpc)
            {
                int myId;
    
                if (jsonRpc.Id == null)
                {
                    lock (idLock)
                    {
                        myId = ++id;
                    }
    
                    jsonRpc.Id = myId.ToString();
                }
    
                var jsonReqStr = JsonConvert.SerializeObject(jsonRpc);
                var contextNameValueCollection = new NameValueCollection();
                Rpc.GlobalContextSet?.Invoke(contextNameValueCollection);
                MergeContextValues(contextNameValueCollection, this.Option);
    
                var abpSession = Dependency.IocManager.Instance.Resolve<IAbpSession>();
                if (!string.IsNullOrEmpty(abpSession.AccessToken)) // 将用户Token转递到其他微服务
                {
                    contextNameValueCollection.Add("AccessToken", "1");
                    contextNameValueCollection.Add("UserId", abpSession.UserId?.ToString());
                    contextNameValueCollection.Add("UserName", abpSession.UserName);
                    contextNameValueCollection.Add("TenantId", abpSession.TenantId?.ToString());
                    contextNameValueCollection.Add("RoleIds", abpSession.RoleIds);
    
                }
                var jsonRespStr = LocalRpcRun(jsonReqStr, contextNameValueCollection);
                JsonResponse<T> rjson = JsonConvert.DeserializeObject<JsonResponse<T>>(jsonRespStr);
    
                if (rjson == null)
                {
                    if (!string.IsNullOrEmpty(jsonRespStr))
                    {
                        JObject jo = JsonConvert.DeserializeObject(jsonRespStr) as JObject;
                        throw new Exception(jo["Error"].ToString());
                    }
                    else
                    {
                        throw new Exception("Empty response");
                    }
                }
    
                return rjson;
            }
    
            private static string LocalRpcRun(string jsonReqStr, NameValueCollection contextNameValueCollection)
            {
                return JsonRpcProcessor.Process(jsonReqStr, contextNameValueCollection).Result;
            }
        }
    
    internal class HttpRpcInvoker : RpcInvoker
        {
    
            public static bool EnabledGzip = true;
    
            private static Stream CopyAndClose(Stream inputStream)
            {
                const int readSize = 4096;
                byte[] buffer = new byte[readSize];
                MemoryStream ms = new MemoryStream();
    
                int count = inputStream.Read(buffer, 0, readSize);
                while (count > 0)
                {
                    ms.Write(buffer, 0, count);
                    count = inputStream.Read(buffer, 0, readSize);
                }
                ms.Position = 0;
                inputStream.Close();
                return ms;
            }
    
    
            internal override JsonResponse<T> Invoke<T>(JsonRequest jsonRpc)
            {
                HttpWebRequest req = null;
    
                int myId;
    
                if (jsonRpc.Id == null)
                {
                    lock (idLock)
                    {
                        myId = ++id;
                    }
    
                    jsonRpc.Id = myId.ToString();
                }
    
                req = WebRequest.Create(new Uri(ServiceAddress + "?callid=" + jsonRpc.Id.ToString() + "&method=" + jsonRpc.Method)) as HttpWebRequest;
                req.KeepAlive = false;
                req.Proxy = null;
                req.Method = "Post";
                req.ContentType = "application/json-rpc";
                if (Rpc.GlobalContextSet != null)
                    Rpc.GlobalContextSet(req.Headers);
    
                var accessToken = Dependency.IocManager.Instance.Resolve<IAbpSession>().AccessToken;
                if(!string.IsNullOrEmpty(accessToken)) // 将用户Token转递到其他微服务
                {
                    req.Headers.Add("Authorization", accessToken);
    
                }
                if(!string.IsNullOrEmpty(Rpc.GlobalAccessToken))
                {
                    req.Headers.Add("Authorization", "Bearer " + Rpc.GlobalAccessToken);
                }
                MergeContextValues(req.Headers, this.Option);
                if (this.Option != null && this.Option.Timeout > 0)
                {
                    req.Timeout = this.Option.Timeout;
                }
                else
                {
                    req.Timeout = 400000;
                }
                req.ReadWriteTimeout = req.Timeout;
                if (EnabledGzip)
                {
                    req.Headers["Accept-Encoding"] = "gzip";
                }
    
                var stream = new StreamWriter(req.GetRequestStream());
                var json = Newtonsoft.Json.JsonConvert.SerializeObject(jsonRpc);
                stream.Write(json);
                stream.Close();
    
                var resp = req.GetResponse();
                string sstream;
    
                string contentEncoding = resp.Headers["Content-Encoding"];
    
                if (contentEncoding != null && contentEncoding.Contains("gzip"))
                {
                    var mstream = CopyAndClose(resp.GetResponseStream());
    
                    using (var gstream = new GZipStream(mstream, CompressionMode.Decompress))
                    {
                        using (var reader = new StreamReader(gstream, UTF8Encoding))
                        {
                            sstream = reader.ReadToEnd();
                        }
                    }
                }
                else
                {
                    using (var rstream = new StreamReader(CopyAndClose(resp.GetResponseStream())))
                    {
                        sstream = rstream.ReadToEnd();
                    }
                }
                resp.Close();
                JsonResponse<T> rjson = Newtonsoft.Json.JsonConvert.DeserializeObject<JsonResponse<T>>(sstream);
    
                if (rjson == null)
                {
                    if (!string.IsNullOrEmpty(sstream))
                    {
                        JObject jo = Newtonsoft.Json.JsonConvert.DeserializeObject(sstream) as JObject;
                        throw new Exception(jo["Error"].ToString());
                    }
                    else
                    {
                        throw new Exception("Empty response");
                    }
                }
    
                return rjson;
            }
    
        }
    
    internal class TcpRpcInvoker : RpcInvoker
        {
            internal override JsonResponse<T> Invoke<T>(JsonRequest jsonRpc)
            {
                int myId;
    
                if (jsonRpc.Id == null)
                {
                    lock (idLock)
                    {
                        myId = ++id;
                    }
    
                    jsonRpc.Id = myId.ToString();
                }
    
                var jsonReqStr = JsonConvert.SerializeObject(jsonRpc);
                var contextNameValueCollection = new NameValueCollection();
                Rpc.GlobalContextSet?.Invoke(contextNameValueCollection);
                MergeContextValues(contextNameValueCollection, this.Option);
    
                var abpSession = Dependency.IocManager.Instance.Resolve<IAbpSession>();
                if (!string.IsNullOrEmpty(abpSession.AccessToken)) // 将用户Token转递到其他微服务
                {
                    contextNameValueCollection.Add("AccessToken", "1");
                    contextNameValueCollection.Add("UserId", abpSession.UserId?.ToString());
                    contextNameValueCollection.Add("UserName", abpSession.UserName);
                    contextNameValueCollection.Add("TenantId", abpSession.TenantId?.ToString());
                    contextNameValueCollection.Add("RoleIds", abpSession.RoleIds);
    
                }
    
                var jsonRespStr = LocalRpcRun(jsonReqStr, contextNameValueCollection,this.ServiceAddress);
                JsonResponse<T> rjson = JsonConvert.DeserializeObject<JsonResponse<T>>(jsonRespStr);
    
                if (rjson == null)
                {
                    if (!string.IsNullOrEmpty(jsonRespStr))
                    {
                        JObject jo = JsonConvert.DeserializeObject(jsonRespStr) as JObject;
                        throw new Exception(jo["Error"].ToString());
                    }
                    else
                    {
                        throw new Exception("Empty response");
                    }
                }
    
                return rjson;
            }
    
            private static string LocalRpcRun(string jsonReqStr, NameValueCollection contextNameValueCollection, string serverAddress)
            {
                var transportClientFactory = Dependency.IocManager.Instance.Resolve<ITransportClientFactory>();
                var req = JsonConvert.DeserializeObject<JsonRequest>(jsonReqStr);
                var tcpAddress = serverAddress.Split(new char[] { ':' }, StringSplitOptions.RemoveEmptyEntries);
                var result = transportClientFactory.CreateClient(new IpAddressModel(tcpAddress[0], int.Parse(tcpAddress[1])).CreateEndPoint())
              .SendAsync(req, contextNameValueCollection, Task.Factory.CancellationToken).Result;
    return JsonConvert.SerializeObject(result); } }
  • 相关阅读:
    D触发器深入详细介绍(zhuanzai)
    脉冲
    数字电路中时序
    嵌入式中对某一位清0或置1
    8本推荐阅读的UX书籍
    Hadoop之HDFS的Shell操作
    Hadoop之HDFS概述
    Hadoop之搭建完全分布式运行模式
    Hadoop之运行模式
    Hadoop之运行环境搭建
  • 原文地址:https://www.cnblogs.com/spritekuang/p/10805700.html
Copyright © 2011-2022 走看看