zoukankan      html  css  js  c++  java
  • 企业级工作流解决方案(七)--微服务Tcp消息传输模型之消息编解码

      Tcp消息传输主要参照surging来做的,做了部分裁剪和改动,详细参见:https://github.com/dotnetcore/surging

      Json-rpc没有定义消息如何传输,因此,Json-Rpc RpcRequest对象和RpcResponse对象需要一个传输载体,这里的传输对象主是TransportMessage,如下代码,这里的Content请求时为RcpRequest对象,答复时为RpcResponse对象,答复时Header一般情况下为空。

    /// <summary>
        /// 传输消息模型。
        /// </summary>
        public class TransportMessage
        {
    
            public TransportMessage()
            {
            }
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
            public TransportMessage(object content,object headers)
            {
                if (content == null)
                    throw new ArgumentNullException(nameof(content));
    
                Content = content;
                Headers = headers;
                ContentType = content.GetType().FullName;
            }
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
            public TransportMessage(object content, object headers, string fullName)
            {
                if (content == null)
                    throw new ArgumentNullException(nameof(content));
    
                Headers = headers;
                Content = content;
                ContentType = fullName;
            }
    
            /// <summary>
            /// 消息Id。
            /// </summary>
            public string Id { get; set; }
    
            /// <summary>
            /// 消息内容。
            /// </summary>
            public object Content { get; set; }
    
            /// <summary>
            /// 消息传输Header
            /// </summary>
            public object Headers { get; set; }
    
            /// <summary>
            /// 内容类型。
            /// </summary>
            public string ContentType { get; set; }
    
            /// <summary>
            /// 是否调用消息。
            /// </summary>
            /// <returns>如果是则返回true,否则返回false。</returns>
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
            public bool IsInvokeMessage()
            {
                return ContentType == MessagePackTransportMessageType.jsonRequestTypeName;
            }
    
            /// <summary>
            /// 是否是调用结果消息。
            /// </summary>
            /// <returns>如果是则返回true,否则返回false。</returns>
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
            public bool IsInvokeResultMessage()
            {
                return ContentType == MessagePackTransportMessageType.jsonResponseTypeName;
            }
    
            /// <summary>
            /// 获取内容。
            /// </summary>
            /// <typeparam name="T">内容类型。</typeparam>
            /// <returns>内容实例。</returns> 
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
            public T GetContent<T>()
            {
                return (T)Content;
            }
    
            /// <summary>
            /// 获取Header。
            /// </summary>
            /// <typeparam name="T">Header类型。</typeparam>
            /// <returns>Header实例。</returns> 
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
            public T GetHeaders<T>()
            {
                return (T)Headers;
            }
    
            /// <summary>
            /// 创建一个调用传输消息。
            /// </summary>
            /// <param name="invokeMessage">调用实例。</param>
            /// <returns>调用传输消息。</returns>  
            public static TransportMessage CreateInvokeMessage(JsonRequest invokeMessage,NameValueCollection nameValueCollection)
            {
                return new TransportMessage(invokeMessage, nameValueCollection, MessagePackTransportMessageType.jsonRequestTypeName)
                {
                    Id = Guid.NewGuid().ToString("N")
                };
            }
    
            /// <summary>
            /// 创建一个调用结果传输消息。
            /// </summary>
            /// <param name="id">消息Id。</param>
            /// <param name="invokeResultMessage">调用结果实例。</param>
            /// <returns>调用结果传输消息。</returns>  
            public static TransportMessage CreateInvokeResultMessage(string id, JsonResponse jsonResponse,NameValueCollection nameValueCollection)
            {
                return new TransportMessage(jsonResponse, nameValueCollection, MessagePackTransportMessageType.jsonResponseTypeName)
                {
                    Id = id
                };
            }
        }

      TransportMessage需要在dotnetty中传输,则需要对TransportMessage进行编码解码

    消息编解码器

    public interface ITransportMessageEncoder
        {
            byte[] Encode(TransportMessage message);
    }
    public interface ITransportMessageDecoder
        {
            TransportMessage Decode(byte[] data);
        }

    Json编解码

      平时编码中经常用的方式

    public sealed class JsonTransportMessageEncoder : ITransportMessageEncoder
        {
            #region Implementation of ITransportMessageEncoder
    
            public byte[] Encode(TransportMessage message)
            {
                var content = JsonConvert.SerializeObject(message);
                return Encoding.UTF8.GetBytes(content);
            }
    
            #endregion Implementation of ITransportMessageEncoder
    }
    
    public sealed class JsonTransportMessageDecoder : ITransportMessageDecoder
        {
            #region Implementation of ITransportMessageDecoder
    
            public TransportMessage Decode(byte[] data)
            {
                var content = Encoding.UTF8.GetString(data);
                var message = JsonConvert.DeserializeObject<TransportMessage>(content);
                if (message.IsInvokeMessage())
                {
                    message.Content = JsonConvert.DeserializeObject<JsonRequest>(message.Content.ToString());
                }
                if (message.IsInvokeResultMessage())
                {
                    message.Content = JsonConvert.DeserializeObject<JsonResponse>(message.Content.ToString());
                }
                return message;
            }
    
            #endregion Implementation of ITransportMessageDecoder
        }

    MessagePack

      官网地址:https://msgpack.org/

      贴出代码,不过多的解释

    [MessagePackObject]
        public class MessagePackTransportMessage
        {
            public MessagePackTransportMessage(TransportMessage transportMessage)
            {
                Id = transportMessage.Id;
                ContentType = transportMessage.ContentType;
    
                object contentObject;
                if (transportMessage.IsInvokeMessage())
                {
                    contentObject = new MessagePackJsonRequest(transportMessage.GetContent<JsonRequest>());
                }
                else if (transportMessage.IsInvokeResultMessage())
                {
                    contentObject = new MessagePackJsonResponse(transportMessage.GetContent<JsonResponse>());
                }
                else
                {
                    throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");
                }
    
                Content = SerializerUtilitys.Serialize(contentObject);
    
                var headersObject = transportMessage.GetHeaders<NameValueCollection>();
    
                Headers = SerializerUtilitys.Serialize(JsonConvert.SerializeObject(MessagePackTransportMessageType.NvcToDictionary(headersObject)));
            }
    
            public MessagePackTransportMessage()
            {
            }
    
            [Key(0)]
            public string Id { get; set; }
    
            [Key(1)]
            public byte[] Content { get; set; }
    
            [Key(2)]
            public byte[] Headers { get; set; }
    
            [Key(3)]
            public string ContentType { get; set; }
    
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
            public bool IsInvokeMessage()
            {
                return ContentType == MessagePackTransportMessageType.jsonRequestTypeName;
            }
    
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
            public bool IsInvokeResultMessage()
            {
                return ContentType == MessagePackTransportMessageType.jsonResponseTypeName;
            }
    
            public TransportMessage GetTransportMessage()
            {
                var message = new TransportMessage
                {
                    ContentType = ContentType,
                    Id = Id,
                    Content = null,
                    Headers = null,
                };
    
                object contentObject;
                if (IsInvokeMessage())
                {
                    contentObject =
                        SerializerUtilitys.Deserialize<MessagePackJsonRequest>(Content).GetJsonRequest();
                }
                else if (IsInvokeResultMessage())
                {
                    contentObject =
                        SerializerUtilitys.Deserialize<MessagePackJsonResponse>(Content)
                            .GetJsonResponse();
                }
                else
                {
                    throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");
                }
                message.Content = contentObject;
                var headers = SerializerUtilitys.Deserialize<string>(Headers);
                message.Headers = JsonConvert.DeserializeObject(headers);
                return message;
            }
    }
    
    public sealed class MessagePackTransportMessageEncoder:ITransportMessageEncoder
        {
            #region Implementation of ITransportMessageEncoder
    
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
            public byte[] Encode(TransportMessage message)
            {
                var transportMessage = new MessagePackTransportMessage(message)
                {
                    Id = message.Id,
                    ContentType = message.ContentType,
                };
                return SerializerUtilitys.Serialize(transportMessage);
            }
            #endregion Implementation of ITransportMessageEncoder
        }
    
    public sealed class MessagePackTransportMessageDecoder : ITransportMessageDecoder
        {
            #region Implementation of ITransportMessageDecoder
    
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
            public TransportMessage Decode(byte[] data)
            {
                var message = SerializerUtilitys.Deserialize<MessagePackTransportMessage>(data);
                return message.GetTransportMessage();
            }
    
            #endregion Implementation of ITransportMessageDecoder
        }

    ProtoBuffer

      这个应该听得比较多

    [ProtoContract]
        public class ProtoBufferTransportMessage
        {
            public ProtoBufferTransportMessage(TransportMessage transportMessage)
            {
                Id = transportMessage.Id;
                ContentType = transportMessage.ContentType;
    
                object contentObject;
                if (transportMessage.IsInvokeMessage())
                {
                    contentObject = new ProtoBufferJsonRequest(transportMessage.GetContent<JsonRequest>());
                }
                else if (transportMessage.IsInvokeResultMessage())
                {
                    contentObject = new ProtoBufferJsonResponse(transportMessage.GetContent<JsonResponse>());
                }
                else
                {
                    throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");
                }
    
                Content = SerializerUtilitys.Serialize(contentObject);
                Headers = SerializerUtilitys.Serialize(transportMessage.GetHeaders<NameValueCollection>());
            }
    
            public ProtoBufferTransportMessage()
            {
            }
            
            [ProtoMember(1)]
            public string Id { get; set; }
            
            [ProtoMember(2)]
            public byte[] Content { get; set; }
    
            [ProtoMember(3)]
            public byte[] Headers { get; set; }
    
            [ProtoMember(4)]
            public string ContentType { get; set; }
    
        
            public bool IsInvokeMessage()
            {
                return ContentType == MessagePackTransportMessageType.jsonRequestTypeName;
            }
    
            public bool IsInvokeResultMessage()
            {
                return ContentType == MessagePackTransportMessageType.jsonResponseTypeName;
            }
    
            public TransportMessage GetTransportMessage()
            {
                var message = new TransportMessage
                {
                    ContentType = ContentType,
                    Id = Id,
                    Content = null,
                    Headers = null,
                };
    
                object contentObject;
                if (IsInvokeMessage())
                {
                    contentObject =
                        SerializerUtilitys.Deserialize<ProtoBufferJsonRequest>(Content).GetJsonRequest();
                }
                else if (IsInvokeResultMessage())
                {
                    contentObject =
                        SerializerUtilitys.Deserialize<ProtoBufferJsonResponse>(Content)
                            .GetJsonResponse();
                }
                else
                {
                    throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");
                }
    
                message.Content = contentObject;
                message.Headers = SerializerUtilitys.Deserialize<NameValueCollection>(Headers);
    
                return message;
            }
    }
    
    public sealed class ProtoBufferTransportMessageEncoder : ITransportMessageEncoder
        {
            #region Implementation of ITransportMessageEncoder
    
            public byte[] Encode(TransportMessage message)
            {
                var transportMessage = new ProtoBufferTransportMessage(message)
                {
                    Id = message.Id,
                    ContentType = message.ContentType,
                };
    
                return SerializerUtilitys.Serialize(transportMessage);
            }
    
            #endregion Implementation of ITransportMessageEncoder
        }
    
    public sealed class ProtoBufferTransportMessageDecoder : ITransportMessageDecoder
        {
            #region Implementation of ITransportMessageDecoder
    
            public TransportMessage Decode(byte[] data)
            {
                var message = SerializerUtilitys.Deserialize<ProtoBufferTransportMessage>(data);
                return message.GetTransportMessage();
            }
    
            #endregion Implementation of ITransportMessageDecoder
        }
  • 相关阅读:
    find和findIndex原理
    npm相关依赖操作+版本问题
    package-lock锁文件作用
    npm的版本控制和切换
    package.json文件各个选项含义
    package.json中的script选项作用
    WPF学习之资源-Resources
    WPF中的ListBox实现按块显示元素的方法
    WPF中button按钮同时点击多次触发click解决方法
    浅谈WPF本质中的数据和行为
  • 原文地址:https://www.cnblogs.com/spritekuang/p/10805754.html
Copyright © 2011-2022 走看看