zoukankan      html  css  js  c++  java
  • WCF进阶:将编码后的字节流压缩传输

      在前面两篇文章WCF进阶:将消息正文Base64编码WCF进阶:为每个操作附加身份信息中讲述了如何通过拦截消息的方式来记录或者修改消息,这种方式在特定条件下可以改变消息编码格式,但实现方式并不规范,而且使用范围也有限制。 WCF缺省提供了三种编码器(MessageEncoder):TextMessageEncoder,BinaryMessageEncoder,MtomMessageEncoder。事实上也是基于XML可以有三种格式:Text,Binary,MTOM,而XmlDictionaryWriter也提供了三种创建Writer的方法,CreateTextWriterCreateBinaryWriterCreateMtomWriter他们分别用于将XML以文本,二进制,MTOM保存。三种保存形式各有利弊,Text便于理解,更通用,Binary体积小,MTOM是优化之后的二进制,适用于较大的二进制传输。但无论使用哪种,最终在网络上传输的都是字节流或者叫字节数组。在Binding中处于最后一个的总是TransportBindingElement,也就是说当要传递的数据到达TransportBindingElement之后,其实已经是字节数组了。在TransportBindingElement之上,可以有事务处理器,会话处理器,消息安全处理器,编码器,传输安全处理器等,在编码器之前,主要的处理对象是Message,而之后,主要的处理对象就是Stream(Byte[]),从这点我们也就清楚了之前在学习安全体系的时候,总是习惯将安全划分为消息级别的安全和传输级别安全两种了。我们这次要实现的其实是将已经经过编码器编码好的字节流压缩后传输,而不是传统意义上的消息编码。这点也需要大家深层次的理解。

      虽然我们实现的传输层面上的压缩编码,但实现机理和自定义MessageEncoder是一样的,MessageEncoder是所有编码器的基类,它有几个非常重要的方法和属性

    public abstract Message ReadMessage(ArraySegment<byte> buffer, BufferManager bufferManager, string contentType);
    public ArraySegment<byte> WriteMessage(Message message, int maxMessageSize, BufferManager bufferManager);
    // Properties
    public abstract string ContentType { get; }
    public abstract string MediaType { get; }
    public abstract MessageVersion MessageVersion { get; }

    ReadMessage和WriteMessage是完成消息 <->字节数组转换的,这两个方法(还有几个重载),在实现自定义编码器的时候是最为重要的,我们主要是通过重写它们来完成自定义转换。上面我们也说过了,我们要实现的是传输层的压缩编码,那么需要有一个缺省编码,为此我们设计了名为CompressEncoder的自定义编码类:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.ServiceModel.Channels;
    using System.IO;
    
    namespace RobinLib
    {
        public class CompressEncoder : MessageEncoder
        {
            CompressEncoderFactory factory;
            MessageEncoder innserEncoder;
            private CompressAlgorithm algorithm;
    
            public CompressEncoder(CompressEncoderFactory encoderFactory, CompressAlgorithm algorithm)
            {
                factory = encoderFactory;
                this.algorithm = algorithm;
                innserEncoder = factory.InnerMessageEncodingBindingElement.CreateMessageEncoderFactory().Encoder;
            }
            public override string ContentType
            {
                get { return innserEncoder.ContentType; }
            }
            public override string MediaType
            {
                get { return innserEncoder.MediaType; }
            }
            public override MessageVersion MessageVersion
            {
                get { return innserEncoder.MessageVersion; }
            }
            public override bool IsContentTypeSupported(string contentType)
            {
                return innserEncoder.IsContentTypeSupported(contentType);
            }
            public override T GetProperty<T>()
            {
                return innserEncoder.GetProperty<T>();
            }
            public override Message ReadMessage(ArraySegment<byte> buffer, BufferManager bufferManager, string contentType)
            {
                ArraySegment<byte> bytes = new Compressor(algorithm).DeCompress(buffer);
                int totalLength = bytes.Count;
                byte[] totalBytes = bufferManager.TakeBuffer(totalLength);
                Array.Copy(bytes.Array, 0, totalBytes, 0, bytes.Count);
                ArraySegment<byte> byteArray = new ArraySegment<byte>(totalBytes, 0, bytes.Count);
                bufferManager.ReturnBuffer(byteArray.Array); 
                Message msg = innserEncoder.ReadMessage(byteArray, bufferManager, contentType);
                return msg;
    
            }
            public override Message ReadMessage(System.IO.Stream stream, int maxSizeOfHeaders, string contentType)
            {
                //读取消息的时候,二进制流为加密的,需要解压
                Stream ms = new Compressor(algorithm).DeCompress(stream); 
                Message msg = innserEncoder.ReadMessage(ms, maxSizeOfHeaders, contentType);
                return msg;
            }
            public override ArraySegment<byte> WriteMessage(Message message, int maxMessageSize, BufferManager bufferManager, int messageOffset)
            { 
                ArraySegment<byte> bytes = innserEncoder.WriteMessage(message, maxMessageSize, bufferManager);
                ArraySegment<byte> buffer = new Compressor(algorithm).Compress(bytes);
                int totalLength = buffer.Count + messageOffset;
                byte[] totalBytes = bufferManager.TakeBuffer(totalLength);
                Array.Copy(buffer.Array, 0, totalBytes, messageOffset, buffer.Count);
                ArraySegment<byte> byteArray = new ArraySegment<byte>(totalBytes, messageOffset, buffer.Count);
                Console.WriteLine("算法:"+algorithm+",原来字节流大小:"+bytes.Count+",压缩后字节流大小:"+byteArray.Count);
                return byteArray;
            }
            public override void WriteMessage(Message message, System.IO.Stream stream)
            {
                System.IO.MemoryStream ms = new System.IO.MemoryStream();
                innserEncoder.WriteMessage(message, ms);
                stream = new Compressor(algorithm).Compress(ms);
            }
        }
    }

    在这个类中需要知道上层编码器是什么,我们用MessageEncoder innserEncoder来指定,在WriteMessage时候,将消息用内置编码器转换为字节数组,然后用压缩算法压缩这个数组,形成压缩后字节数组传递给到下一层,而在读取Message的时候,首先将收到的字节数组解压缩,最后将解压缩后字节数组用内置编码器转换为Message对象。其中Compressor是一个功能类,用于将字节数组压缩或者解压缩,代码为:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.IO;
    using System.IO.Compression;
    namespace RobinLib
    {
        public class Compressor
        {
            private CompressAlgorithm algorithm;
    
            public Compressor(CompressAlgorithm algorithm)
            {
                this.algorithm = algorithm;
            }
    
    
    
            //压缩数组
            public ArraySegment<byte> Compress(ArraySegment<byte> data)
            {
                MemoryStream ms = new MemoryStream();
    
                if (algorithm == CompressAlgorithm.GZip)
                {
                    Stream compressStream = new GZipStream(ms, CompressionMode.Compress, true);
                    compressStream.Write(data.Array, 0, data.Count);
                    compressStream.Close();
                }
                else
                {
                    Stream compressStream = new DeflateStream(ms, CompressionMode.Compress, true);
                    compressStream.Write(data.Array, 0, data.Count);
                    compressStream.Close();
                }
                byte[] newByteArray = new byte[ms.Length];
    
                ms.Seek(0, SeekOrigin.Begin);
                ms.Read(newByteArray, 0, newByteArray.Length);
    
                ArraySegment<byte> bytes = new ArraySegment<byte>(newByteArray);
                return bytes;
            }
    
            //压缩流
            public Stream Compress(Stream stream)
            {
                MemoryStream ms = new MemoryStream();
                if (algorithm == CompressAlgorithm.GZip)
                {
                    Stream compressStream = new GZipStream(ms, CompressionMode.Compress, true);
                    byte[] buffer = new byte[stream.Length];
                    stream.Read(buffer, 0, buffer.Length);
                    compressStream.Write(buffer, 0, buffer.Length);
                    compressStream.Close();
                }
                else
                {
                    Stream compressStream = new DeflateStream(ms, CompressionMode.Compress, true);
                    byte[] buffer = new byte[stream.Length];
                    stream.Read(buffer, 0, buffer.Length);
                    compressStream.Write(buffer, 0, buffer.Length);
                    compressStream.Close();
                }
                return ms;
            }
    
            //解压缩数组
            public ArraySegment<byte> DeCompress(ArraySegment<byte> data)
            {
                MemoryStream ms = new MemoryStream();
    
                ms.Write(data.Array, 0, data.Count);
                ms.Seek(0, SeekOrigin.Begin);
                if (algorithm == CompressAlgorithm.GZip)
                {
                    Stream compressStream = new GZipStream(ms, CompressionMode.Decompress, false);
                    byte[] newByteArray = RetrieveBytesFromStream(compressStream, 1);
                    compressStream.Close();
                    return new ArraySegment<byte>(newByteArray);
                }
                else
                {
                    Stream compressStream = new DeflateStream(ms, CompressionMode.Decompress, false);
                    byte[] newByteArray = RetrieveBytesFromStream(compressStream, 1);
                    compressStream.Close();
                    return new ArraySegment<byte>(newByteArray);
                }
            }
    
            //解压缩数组
            public Stream DeCompress(Stream stream)
            {
                stream.Seek(0, SeekOrigin.Begin);
                if (algorithm == CompressAlgorithm.GZip)
                {
                    Stream compressStream = new GZipStream(stream, CompressionMode.Decompress, false);
                    byte[] newByteArray = RetrieveBytesFromStream(compressStream, 1);
                    compressStream.Close();
                    return new MemoryStream(newByteArray);
                }
                else
                {
                    Stream compressStream = new DeflateStream(stream, CompressionMode.Decompress, false);
                    byte[] newByteArray = RetrieveBytesFromStream(compressStream, 1);
                    compressStream.Close();
                    return new MemoryStream(newByteArray);
                }
            }
            public static byte[] RetrieveBytesFromStream(Stream stream, int bytesblock)
            {
    
                List<byte> lst = new List<byte>();
                byte[] data = new byte[1024];
                int totalCount = 0;
                while (true)
                {
                    int bytesRead = stream.Read(data, 0, data.Length);
                    if (bytesRead == 0)
                    {
                        break;
                    }
                    byte[] buffers = new byte[bytesRead];
                    Array.Copy(data, buffers, bytesRead);
                    lst.AddRange(buffers);
                    totalCount += bytesRead;
                }
                return lst.ToArray();
            }
        }
    }

    到此,其实我们的自定义编码器应该编写好了,接下来如何使用它成为我们最为关心的事情。每一个MessageEncoder都对应一个MessageEncoderFactory,在MessageEncodingBindingElement中能返回这个MessageEncoderFactory,然后通过在自定义BindingElement创建监听通道(BuildChannelListener)和通道工厂(BuildChannelFactory)的时候,将BindingElement添加到BindingContext,这样就能最终消费我们上面实现的CompressEncoder。

    CompressEncoderFactory的代码实现为:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.ServiceModel.Channels;
    
    namespace RobinLib
    {
        public class CompressEncoderFactory:MessageEncoderFactory
        {
            private MessageEncodingBindingElement innerMessageEncodingBindingElement;
            CompressEncoder messageEncoder;
            private CompressAlgorithm algorithm;
            public CompressEncoderFactory(MessageEncodingBindingElement innerMessageEncodingBindingElement, CompressAlgorithm algorithm)
            {
                this.innerMessageEncodingBindingElement = innerMessageEncodingBindingElement;
                this.algorithm = algorithm;
                messageEncoder = new CompressEncoder(this,algorithm);
            }
            public override MessageEncoder CreateSessionEncoder()
            {
                return base.CreateSessionEncoder();
            }
            public override MessageEncoder Encoder
            {
                get { return messageEncoder; }
            }
            public override MessageVersion MessageVersion
            {
                get { return innerMessageEncodingBindingElement.MessageVersion; }
            }
            public MessageEncodingBindingElement InnerMessageEncodingBindingElement
            {
                get
                {
                    return innerMessageEncodingBindingElement;
                }
            }
        }
    }

    自定义的MessageEncoderBindingElement代码为:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.ServiceModel.Channels;
    using System.ServiceModel;
    using System.Xml;
    
    namespace RobinLib
    {
        public sealed class CompressEncodingBindingElement : MessageEncodingBindingElement
        { 
            private XmlDictionaryReaderQuotas readerQuotas;
            private MessageEncodingBindingElement innerMessageEncodingBindingElement;
            private CompressAlgorithm algorithm;
            public MessageEncodingBindingElement InnerMessageEncodingBindingElement
            {
                get
                {
                    return innerMessageEncodingBindingElement;
                }
            }
    
            public CompressAlgorithm CompressAlgorithm
            {
                get
                {
                    return algorithm;
                }
            }
    
            public CompressEncodingBindingElement(MessageEncodingBindingElement innerMessageEncodingBindingElement, CompressAlgorithm algorithm)
            {
                this.readerQuotas = new XmlDictionaryReaderQuotas();
                this.algorithm = algorithm;
                this.innerMessageEncodingBindingElement = innerMessageEncodingBindingElement;
            }
    
            public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
            {
                context.BindingParameters.Add(this);
                return context.BuildInnerChannelFactory<TChannel>();
            }
            public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
            {
                context.BindingParameters.Add(this);
                return context.BuildInnerChannelListener<TChannel>();
            }
            public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
            {
                context.BindingParameters.Add(this);
                return context.CanBuildInnerChannelFactory<TChannel>();
            }
            public override bool CanBuildChannelListener<TChannel>(BindingContext context)
            {
                context.BindingParameters.Add(this);
                return context.CanBuildInnerChannelListener<TChannel>();
            }
            public override MessageEncoderFactory CreateMessageEncoderFactory()
            {
                return new CompressEncoderFactory(innerMessageEncodingBindingElement,algorithm);
            }
            public override T GetProperty<T>(BindingContext context)  
            {
                if (typeof(T) == typeof(XmlDictionaryReaderQuotas))
                {
                    return this.readerQuotas as T;
                }
                return base.GetProperty<T>(context);
    
            }
            public override MessageVersion MessageVersion
            {
                get
                {
                    return innerMessageEncodingBindingElement.MessageVersion;
                }
                set
                {
                    innerMessageEncodingBindingElement.MessageVersion = value;
                }
            }
            
            public override BindingElement Clone()
            {
                return new CompressEncodingBindingElement(innerMessageEncodingBindingElement,algorithm);
            } 
        }
    }

    最终,我们可以使用CustomeBinding创建宿主和客户端。

    服务端:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.ServiceModel;
    using Robin_Wcf_CustomMessageEncoder_SvcLib;
    using System.ServiceModel.Channels;
    using RobinLib;
    
    namespace Robin_Wcf_CustomMessageEncoder_Host
    {
        class Program
        {
            static void Main(string[] args)
            {
                //服务地址
                Uri baseAddress = new Uri("http://127.0.0.1:8081/Robin_Wcf_Formatter");
                ServiceHost host = new ServiceHost(typeof(Service1), new Uri[] { baseAddress });
                //服务绑定
                ICollection<BindingElement> bindingElements = new List<BindingElement>();
                HttpTransportBindingElement httpBindingElement = new HttpTransportBindingElement();
                CompressEncodingBindingElement textBindingElement = new CompressEncodingBindingElement(new TextMessageEncodingBindingElement(),CompressAlgorithm.Deflate);
                bindingElements.Add(textBindingElement);
                bindingElements.Add(httpBindingElement);
                CustomBinding bind = new CustomBinding(bindingElements);  
                host.AddServiceEndpoint(typeof(IService1), bind, "");
                if (host.Description.Behaviors.Find<System.ServiceModel.Description.ServiceMetadataBehavior>() == null)
                {
                    System.ServiceModel.Description.ServiceMetadataBehavior svcMetaBehavior = new System.ServiceModel.Description.ServiceMetadataBehavior();
                    svcMetaBehavior.HttpGetEnabled = true;
                    svcMetaBehavior.HttpGetUrl = new Uri("http://127.0.0.1:8001/Mex");
                    host.Description.Behaviors.Add(svcMetaBehavior);
                }
                host.Opened += new EventHandler(delegate(object obj, EventArgs e)
                {
                    Console.WriteLine("服务已经启动!");
                }); 
                host.Open();
                Console.Read();
            }
        }
    }

    客户端:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using RobinLib;
    using System.ServiceModel.Channels;
    using Robin_Wcf_CustomMessageEncoder_ClientApp.ServiceReference1;
    
    namespace Robin_Wcf_CustomMessageEncoder_ClientApp
    {
        class Program
        {
            static void Main(string[] args)
            {
                System.Threading.Thread.Sleep(5300);
                ICollection<BindingElement> bindingElements = new List<BindingElement>();
                HttpTransportBindingElement httpBindingElement = new HttpTransportBindingElement();
                CompressEncodingBindingElement textBindingElement = new CompressEncodingBindingElement(new BinaryMessageEncodingBindingElement(), CompressAlgorithm.GZip);
                bindingElements.Add(textBindingElement);
                bindingElements.Add(httpBindingElement); 
                CustomBinding bind = new CustomBinding(bindingElements);  
                ServiceReference1.IService1 svc = new ServiceReference1.Service1Client(bind, new System.ServiceModel.EndpointAddress("http://127.0.0.1:8081/Robin_Wcf_Formatter"));
                string pres = svc.GetData(10);
                Console.WriteLine(pres);
                CompositeType ct = svc.GetDataUsingDataContract(new CompositeType());
                System.IO.MemoryStream ms = new System.IO.MemoryStream();
                for (int i = 0; i < 1000000; i++)
                {
                    byte[] buffer = BitConverter.GetBytes(i);
                    ms.Write(buffer, 0, buffer.Length);
                }
                System.IO.Stream stream = svc.GetStream(ms);
                Console.Read();
            }
        }
    }

    我们可以更改CompressEncodingBindingElement textBindingElement = new CompressEncodingBindingElement(new BinaryMessageEncodingBindingElement(), CompressAlgorithm.GZip);,指定内置MessageEncoder和压缩算法。

    最后附件一句,不是所有的数组压缩后体积都变小的,只有文本类型的压缩后,效果比较明显。运行程序,当内置TextMessageEncodingBindingElement的时候,我们得到的效果为:

    image

    此时说明压缩效果非常明显,

    而当内置BinaryMessageEncodingBindingElement的时候,压缩效果不再突出,甚至起到反作用。

    image

    如果有朋友需要使用压缩传输,可以直接下载项目,引用其中的RobinLib.dll,然后使用自定义Binding。

    项目文件:/jillzhang/Robin_Wcf_CustomMessageEncoder.rar

    下文我们将演示实现对称加密传输。

    作者:jillzhang
    出处:http://jillzhang.cnblogs.com/
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
  • 相关阅读:
    go-elasticsearch 来自官方的 golang es client
    nginx unit nodejs 模块试用(续)
    WebSocket-Over-HTTP Protocol
    pushpin Server-sent events && openresty 集成试用
    Rendering on the Web
    pushpin openresty 集成试用
    100 webhook implementations
    streamdataio 实时数据分发平台
    Generic Realtime Intermediary Protocol
    Pushpin How it works
  • 原文地址:https://www.cnblogs.com/jillzhang/p/1711079.html
Copyright © 2011-2022 走看看