zoukankan      html  css  js  c++  java
  • 实现一个压缩Remoting传输数据的Sink:CompressionSink (转载)

    在前两讲《初探.Net Remoting服务端 Loading Remtoing配置内容的过程 》《初探.Net Remoting客户端 Loading Remtoing配置内容的过程 》中,我已经分析了Remoting 的Sink机制,接下来,就提供一个具体的范例:CompressionSink(原始SourceCode源于Advanced .Net Remoting 1StED)。 CompressionSink通过在客户端和服务端各自插入一个数据压缩-解压缩的Sink。目的是希望减少大数据量传递对网络带宽的占用,提高传输效率。下载SourceCode ,BTW,这个压缩Sink相对比较稳定,大家可以在各自的项目中放心使用。:-)



    详细设计:
    提供一个Assembly: CompressionSink.dll
    它包括:
        客户端:
            CompressionSink.CompressionClientSinkProvider类CompressionSink.CompressionClientSink类
        服务端:
            CompressionSink.CompressionServerSinkProvider类CompressionSink.CompressionServerSink类
        压缩类:CompressionHelper
        压缩内核:NZipLib库。

    客户端的配置文件 :

    <?xml version="1.0" encoding="utf-8" ?>

    <configuration>

        <system.runtime.remoting>

            <application>

                <channels>

                    <channel ref="http">

                        <clientProviders>

                            <formatter ref="soap" />

                            <provider type="CompressionSink.CompressionClientSinkProvider, CompressionSink" />

                        </clientProviders>

                    </channel>

                </channels>

                 <client>

                     <wellknown type="Service.SomeSAO, Service"  url="http://localhost:5555/SomeSAO.soap" />

                 </client>

            </application>

        </system.runtime.remoting>

    </configuration>

    服务端的配置文件 : 

    <?xml version="1.0" encoding="utf-8" ?>

    <configuration>

        <system.runtime.remoting>

            <application>

                <channels>

                    <channel ref="http" port="5555">

                        <serverProviders>

                            <provider type="CompressionSink.CompressionServerSinkProvider, CompressionSink" />

                            <formatter ref="soap"/>

                        </serverProviders>

                    </channel>

                </channels>

                <service>

                    <wellknown mode="Singleton"  type="Service.SomeSAO, Service"   objectUri="SomeSAO.soap" />

                </service>

            </application>

        </system.runtime.remoting>

    </configuration>

     

    public class CompressionClientSinkProvider: IClientChannelSinkProvider
        
    {
            
    private IClientChannelSinkProvider _nextProvider;

            
    public CompressionClientSinkProvider(IDictionary properties, ICollection providerData) 
            
    {
                
    // not yet needed
            }


            
    public IClientChannelSinkProvider Next
            
    {
                
    get {
                    
    return _nextProvider;
                }

                
    set {
                    _nextProvider 
    = value;
                }

            }


            
    public IClientChannelSink CreateSink(IChannelSender channel, string url, object remoteChannelData) 
            
    {
                
    // create other sinks in the chain
                IClientChannelSink next = _nextProvider.CreateSink(channel,
                    url,
                    remoteChannelData);    
        
                
    // put our sink on top of the chain and return it                
                return new CompressionClientSink(next);
            }

        }



     1public class CompressionClientSink: BaseChannelSinkWithProperties, 
     2                                        IClientChannelSink
     3    {
     4        private IClientChannelSink _nextSink;
     5
     6        public CompressionClientSink(IClientChannelSink next) 
     7        {
     8            _nextSink = next;
     9        }

    10
    11        public IClientChannelSink NextChannelSink 
    12        {
    13            get {
    14                return _nextSink;
    15            }

    16        }

    17
    18
    19        public void AsyncProcessRequest(IClientChannelSinkStack sinkStack, 
    20                                        IMessage msg, 
    21                                        ITransportHeaders headers, 
    22                                        Stream stream) 
    23        {
    24
    25
    26            // generate a compressed stream using NZipLib
    27            stream = CompressionHelper.getCompressedStreamCopy(stream);
    28
    29            // push onto stack and forward the request
    30            sinkStack.Push(this,null);
    31            _nextSink.AsyncProcessRequest(sinkStack,msg,headers,stream);
    32        }

    33
    34
    35        public void AsyncProcessResponse(IClientResponseChannelSinkStack sinkStack, 
    36                                            object state, 
    37                                            ITransportHeaders headers, 
    38                                            Stream stream) 
    39        {
    40
    41            // deflate the response
    42            stream = 
    43                CompressionHelper.getUncompressedStreamCopy(stream);
    44
    45            // forward the request
    46            sinkStack.AsyncProcessResponse(headers,stream);
    47        }

    48
    49
    50        public Stream GetRequestStream(IMessage msg, 
    51                                       ITransportHeaders headers) 
    52        {
    53            return _nextSink.GetRequestStream(msg, headers);
    54        }

    55
    56
    57        public void ProcessMessage(IMessage msg, 
    58                                   ITransportHeaders requestHeaders, 
    59                                   Stream requestStream, 
    60                                   out ITransportHeaders responseHeaders, 
    61                                   out Stream responseStream) 
    62        {
    63            // generate a compressed stream using NZipLib
    64
    65            Stream localrequestStream  = 
    66                CompressionHelper.getCompressedStreamCopy(requestStream);
    67
    68            Stream localresponseStream;
    69            // forward the call to the next sink
    70            _nextSink.ProcessMessage(msg,
    71                                     requestHeaders,
    72                                     localrequestStream, 
    73                                     out responseHeaders, 
    74                                     out localresponseStream);
    75
    76            // deflate the response
    77            responseStream = 
    78                CompressionHelper.getUncompressedStreamCopy(localresponseStream);
    79
    80        }

    81    }
     1public class CompressionServerSinkProvider: IServerChannelSinkProvider
     2    {
     3        private IServerChannelSinkProvider _nextProvider;
     4
     5        public CompressionServerSinkProvider(IDictionary properties, ICollection providerData) 
     6        {
     7            // not yet needed
     8        }

     9
    10        public IServerChannelSinkProvider Next
    11        {
    12            get {
    13                return _nextProvider; 
    14            }

    15            set {
    16                _nextProvider = value;
    17            }

    18        }

    19
    20        public IServerChannelSink CreateSink(IChannelReceiver channel)
    21        {
    22            // create other sinks in the chain
    23            IServerChannelSink next = _nextProvider.CreateSink(channel);                
    24    
    25            // put our sink on top of the chain and return it                
    26            return new CompressionServerSink(next);
    27        }

    28
    29        public void GetChannelData(IChannelDataStore channelData)
    30        {
    31            // not yet needed
    32        }

    33
    34    }
    using System;
    using System.Runtime.Remoting.Channels;
    using System.Runtime.Remoting.Messaging;
    using System.IO;

    namespace CompressionSink
    {

        
    public class CompressionServerSink: BaseChannelSinkWithProperties,
            IServerChannelSink
        
    {

            
    private IServerChannelSink _nextSink;

            
    public CompressionServerSink(IServerChannelSink next) 
            
    {
                _nextSink 
    = next;
            }


            
    public IServerChannelSink NextChannelSink 
            
    {
                
    get 
                
    {
                    
    return _nextSink;
                }

            }


            
    public void AsyncProcessResponse(IServerResponseChannelSinkStack sinkStack, 
                
    object state, 
                IMessage msg, 
                ITransportHeaders headers, 
                Stream stream) 
            
    {
                
    // compressing the response
                stream=CompressionHelper.getCompressedStreamCopy(stream);

                
    // forwarding to the stack for further processing
                sinkStack.AsyncProcessResponse(msg,headers,stream);
            }


            
    public Stream GetResponseStream(IServerResponseChannelSinkStack sinkStack, 
                
    object state, 
                IMessage msg, 
                ITransportHeaders headers)
            
    {
                
    return null;
            }


            
    public ServerProcessing ProcessMessage(IServerChannelSinkStack sinkStack, 
                IMessage requestMsg, 
                ITransportHeaders requestHeaders,
                Stream requestStream, 
                
    out IMessage responseMsg, 
                
    out ITransportHeaders responseHeaders, 
                
    out Stream responseStream) 
            
    {
                
    // uncompressing the request
                Stream  localrequestStream = 
                    CompressionHelper.getUncompressedStreamCopy(requestStream);

                
    // pushing onto stack and forwarding the call
                sinkStack.Push(this,null);

                Stream localresponseStream;
                ServerProcessing srvProc 
    = _nextSink.ProcessMessage(sinkStack,
                    requestMsg,
                    requestHeaders,
                    localrequestStream,
                    
    out responseMsg,
                    
    out responseHeaders,
                    
    out localresponseStream);

                
    // compressing the response
                responseStream=
                    CompressionHelper.getCompressedStreamCopy(localresponseStream);

                
    // returning status information
                return srvProc;
            }

        }

    }

     1public class CompressionHelper 
     2    {
     3
     4        /**//// <summary>
     5        /// refactor  by zendy
     6        /// </summary>
     7        /// <param name="inStream"></param>
     8        /// <returns></returns>

     9        public static Stream getCompressedStreamCopy(Stream inStream) 
    10        {
    11            MemoryStream outStream = new MemoryStream();
    12            Deflater mDeflater = new Deflater(Deflater.BEST_COMPRESSION);
    13            DeflaterOutputStream compressStream = new DeflaterOutputStream(outStream,mDeflater);
    14
    15            byte[] buf = new Byte[4096];
    16            int cnt = inStream.Read(buf,0,4096);
    17            while (cnt>0{
    18                compressStream.Write(buf,0,cnt);
    19                cnt = inStream.Read(buf,0,4096);
    20            }

    21            compressStream.Finish();
    22            //modify by zendy //这个设置非常重要,否则会导致后续Sink在处理该stream时失败,在原来的源码中就是因为没有这个处理导致程序运行失败
    23            outStream.Seek(0,SeekOrigin.Begin);
    24            return outStream;
    25        }

    26
    27        /**//// <summary>
    28        /// refactor  by zendy
    29        /// </summary>
    30        /// <param name="inStream"></param>
    31        /// <returns></returns>

    32        public static Stream getUncompressedStreamCopy(Stream inStream) 
    33        {
    34            InflaterInputStream unCompressStream = new InflaterInputStream(inStream); 
    35            MemoryStream outStream = new MemoryStream();
    36            int mSize;
    37            Byte[] mWriteData = new Byte[4096];
    38            while(true)
    39            {
    40                mSize = unCompressStream.Read(mWriteData, 0, mWriteData.Length);
    41                if (mSize > 0)
    42                {
    43                    outStream.Write(mWriteData, 0, mSize);
    44                }

    45                else
    46                {
    47                    break;
    48                }

    49            }

    50            unCompressStream.Close();
    51            //modify by zendy//这个设置非常重要,否则会导致后续Sink在处理该stream时失败,,在原来的源码中就是因为没有这个处理导致程序运行失败
    52            outStream.Seek(0,SeekOrigin.Begin);
    53            return outStream;
    54        }

    55    }


    BTW,这个Sink还可以扩展,比如判断需要压缩Stream的大小,如果很大,就压缩,否则不压缩(可以在responseHeaders和requestHeaders添加是否 已经压缩的标记)

  • 相关阅读:
    Html
    git和github简易教程
    Java基础
    如何学习一门语言
    leetcode题解(持续更新)
    浅谈安全威胁+引子
    内网渗透基础
    Java运算符
    Java修饰符
    Java变量类型
  • 原文地址:https://www.cnblogs.com/daitengfei/p/366366.html
Copyright © 2011-2022 走看看