zoukankan      html  css  js  c++  java
  • 企业级工作流解决方案(四)--微服务消息处理模型之消息传输通道

      消息传输通道我这里只定义了3种,即:localInvoker,HttpInvoker,TcpInvoker,根据实际的情况,还可以进行扩展,比如消息队列,不过这都是后话了,先重点描述一下这3种方式。

    LocalInvoker

      本地调用直接构造请求参数,直接调用服务端的JsonRpcProcessor服务处理执行服务处理过程,这个不多说。

    HttpInvoker

      即执行http请求处理过程,由于.net framework和.net core的运行机制有所不同,处理方式也有所不同,但最终都落到服务端的JsonRpcProcessor身上进行处理,.net framework是通过增加IHttpAsyncHandler处理类处理调用请求,需要在web.config中增加处理handlers,

    <add name="jsonrpc" type="CK.Sprite.JsonRpcFramework.JsonRpcHandler, CK.Sprite.JsonRpcFramework" verb="*" path="/json.rpc"/>

    ,实现类代码如下:

    public class JsonRpcHandler : IHttpAsyncHandler
        {
            public IAsyncResult BeginProcessRequest(HttpContext context, AsyncCallback cb, object extraData)
            {
                context.Response.ContentType = "application/json";
    
                var async = new JsonRpcStateAsync(cb, context);
                async.JsonRpc = GetJsonRpcString(context.Request);
                JsonRpcProcessor.Process(async,context.Request);
                return async;
            }
    
            private static string GetJsonRpcString(System.Web.HttpRequest request)
            {
                string json = string.Empty;
                if (request.RequestType == "GET")
                {
                    json = request.Params["jsonrpc"] ?? string.Empty;
                }
                else if (request.RequestType == "POST")
                {
                    if (request.ContentType == "application/x-www-form-urlencoded")
                    {
                        json = request.Params["jsonrpc"] ?? string.Empty;
                    }
                    else
                    {
                        json = new StreamReader(request.InputStream).ReadToEnd();
                    }
                }
                return json;
            }
        }

      .net core这边的做法是增加RpcHttpRouter类,实现IRouter接口,并在StartUp的Configure方法中,增加处理Router,最终处理还是落到JsonRpcProcessor身上进行处理,代码如下:

    public void Configure(IApplicationBuilder app, IHostingEnvironment env)
            {
                app.UseAbp(); //Initializes ABP framework. Should be called first.
                // ......
                app.Map("/json.rpc", rpcApp =>
                {
                    rpcApp.UseJsonRpc();
                });
            }
    public static IApplicationBuilder UseJsonRpc(this IApplicationBuilder app)
            {
                if (app == null)
                {
                    throw new ArgumentNullException(nameof(app));
                }
                var router = new RpcHttpRouter();
                return app.UseRouter(router);
            }
    public class RpcHttpRouter : IRouter
        {
            public VirtualPathData GetVirtualPath(VirtualPathContext context)
            {
                return null;
            }
    
            /// <summary>
            /// Takes a route/http contexts and attempts to parse, invoke, respond to an Rpc request
            /// </summary>
            /// <param name="context">Route context</param>
            /// <returns>Task for async routing</returns>
            public async Task RouteAsync(RouteContext context)
            {
                try
                {
                    string jsonString = await GetJsonRpcString(context);
    
                    string responseJson = await JsonRpcProcessor.Process(jsonString);
    
                    if (string.IsNullOrEmpty(responseJson))
                    {
                        //No response required, but status code must be 204
                        context.HttpContext.Response.StatusCode = 204;
                        context.MarkAsHandled();
                        return;
                    }
    
                    context.HttpContext.Response.ContentType = "application/json";
                    await context.HttpContext.Response.WriteAsync(responseJson);
    
                    context.MarkAsHandled();
    
                }
                catch (Exception ex)
                {
                    context.MarkAsHandled();
                }
            }
    
            private async Task<string> GetJsonRpcString(RouteContext context)
            {
                string jsonString;
                if (context.HttpContext.Request.Body == null)
                {
                    jsonString = null;
                }
                else
                {
                    using (StreamReader streamReader = new StreamReader(context.HttpContext.Request.Body, Encoding.UTF8,
                        detectEncodingFromByteOrderMarks: true,
                        bufferSize: 1024,
                        leaveOpen: true))
                    {
                        try
                        {
                            jsonString = await streamReader.ReadToEndAsync();
                        }
                        catch (TaskCanceledException ex)
                        {
                            throw new CPlatformException("Cancelled while reading the request.", ex);
                        }
                        jsonString = jsonString.Trim();
                    }
    
                }
    
                return jsonString;
            }
        }
    
        public static class RouteContextExtensions
        {
            public static void MarkAsHandled(this RouteContext context)
            {
                context.Handler = c => Task.FromResult(0);
            }
        }

    TcpInvoker

      Tcp方式的调用后续文章会继续进行分解,也是微服务核心价值的地方。

  • 相关阅读:
    drupal drush 在windows下的安装和配置
    Drupal 7 配置ckeditor和ckfinder编辑器实现图片上传--不用wysisyg
    阿里云Centos配置iptables防火墙
    25个最常用的iptables策略
    防简单攻击iptables策略
    iptables防DDOS攻击和CC攻击设置
    Linux Web服务器网站故障分析常用的命令
    Linux/CentOS防CC攻击脚本
    Map字符串类型去掉空格处理
    读文件字节流大小的动态设置
  • 原文地址:https://www.cnblogs.com/spritekuang/p/10805629.html
Copyright © 2011-2022 走看看