在前两讲《初探.Net Remoting服务端 Loading Remtoing配置内容的过程 》《初探.Net Remoting客户端 Loading Remtoing配置内容的过程 》中,我已经分析了Remoting 的Sink机制,接下来,就提供一个具体的范例:CompressionSink(原始SourceCode源于Advanced .Net Remoting 1StED)。 CompressionSink通过在客户端和服务端各自插入一个数据压缩-解压缩的Sink。目的是希望减少大数据量传递对网络带宽的占用,提高传输效率。下载SourceCode ,BTW,这个压缩Sink相对比较稳定,大家可以在各自的项目中放心使用。:-)
详细设计:
提供一个Assembly: CompressionSink.dll
它包括:
客户端:
CompressionSink.CompressionClientSinkProvider类和CompressionSink.CompressionClientSink类
服务端:
CompressionSink.CompressionServerSinkProvider类和CompressionSink.CompressionServerSink类
压缩类:CompressionHelper
压缩内核:NZipLib库。
客户端的配置文件 :
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<system.runtime.remoting>
<application>
<channels>
<channel ref="http">
<clientProviders>
<formatter ref="soap" />
<provider type="CompressionSink.CompressionClientSinkProvider, CompressionSink" />
</clientProviders>
</channel>
</channels>
<client>
<wellknown type="Service.SomeSAO, Service" url="http://localhost:5555/SomeSAO.soap" />
</client>
</application>
</system.runtime.remoting>
</configuration>
服务端的配置文件 :
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<system.runtime.remoting>
<application>
<channels>
<channel ref="http" port="5555">
<serverProviders>
<provider type="CompressionSink.CompressionServerSinkProvider, CompressionSink" />
<formatter ref="soap"/>
</serverProviders>
</channel>
</channels>
<service>
<wellknown mode="Singleton" type="Service.SomeSAO, Service" objectUri="SomeSAO.soap" />
</service>
</application>
</system.runtime.remoting>
</configuration>
public class CompressionClientSinkProvider: IClientChannelSinkProvider

{
private IClientChannelSinkProvider _nextProvider;
public CompressionClientSinkProvider(IDictionary properties, ICollection providerData)

{
// not yet needed
}
public IClientChannelSinkProvider Next

{

get
{
return _nextProvider;
}

set
{
_nextProvider = value;
}
}
public IClientChannelSink CreateSink(IChannelSender channel, string url, object remoteChannelData)

{
// create other sinks in the chain
IClientChannelSink next = _nextProvider.CreateSink(channel,
url,
remoteChannelData);
// put our sink on top of the chain and return it
return new CompressionClientSink(next);
}
}
1
public class CompressionClientSink: BaseChannelSinkWithProperties,
2
IClientChannelSink
3
{
4
private IClientChannelSink _nextSink;
5
6
public CompressionClientSink(IClientChannelSink next)
7
{
8
_nextSink = next;
9
}
10
11
public IClientChannelSink NextChannelSink
12
{
13
get
{
14
return _nextSink;
15
}
16
}
17
18
19
public void AsyncProcessRequest(IClientChannelSinkStack sinkStack,
20
IMessage msg,
21
ITransportHeaders headers,
22
Stream stream)
23
{
24
25
26
// generate a compressed stream using NZipLib
27
stream = CompressionHelper.getCompressedStreamCopy(stream);
28
29
// push onto stack and forward the request
30
sinkStack.Push(this,null);
31
_nextSink.AsyncProcessRequest(sinkStack,msg,headers,stream);
32
}
33
34
35
public void AsyncProcessResponse(IClientResponseChannelSinkStack sinkStack,
36
object state,
37
ITransportHeaders headers,
38
Stream stream)
39
{
40
41
// deflate the response
42
stream =
43
CompressionHelper.getUncompressedStreamCopy(stream);
44
45
// forward the request
46
sinkStack.AsyncProcessResponse(headers,stream);
47
}
48
49
50
public Stream GetRequestStream(IMessage msg,
51
ITransportHeaders headers)
52
{
53
return _nextSink.GetRequestStream(msg, headers);
54
}
55
56
57
public void ProcessMessage(IMessage msg,
58
ITransportHeaders requestHeaders,
59
Stream requestStream,
60
out ITransportHeaders responseHeaders,
61
out Stream responseStream)
62
{
63
// generate a compressed stream using NZipLib
64
65
Stream localrequestStream =
66
CompressionHelper.getCompressedStreamCopy(requestStream);
67
68
Stream localresponseStream;
69
// forward the call to the next sink
70
_nextSink.ProcessMessage(msg,
71
requestHeaders,
72
localrequestStream,
73
out responseHeaders,
74
out localresponseStream);
75
76
// deflate the response
77
responseStream =
78
CompressionHelper.getUncompressedStreamCopy(localresponseStream);
79
80
}
81
}
1
public class CompressionServerSinkProvider: IServerChannelSinkProvider
2
{
3
private IServerChannelSinkProvider _nextProvider;
4
5
public CompressionServerSinkProvider(IDictionary properties, ICollection providerData)
6
{
7
// not yet needed
8
}
9
10
public IServerChannelSinkProvider Next
11
{
12
get
{
13
return _nextProvider;
14
}
15
set
{
16
_nextProvider = value;
17
}
18
}
19
20
public IServerChannelSink CreateSink(IChannelReceiver channel)
21
{
22
// create other sinks in the chain
23
IServerChannelSink next = _nextProvider.CreateSink(channel);
24
25
// put our sink on top of the chain and return it
26
return new CompressionServerSink(next);
27
}
28
29
public void GetChannelData(IChannelDataStore channelData)
30
{
31
// not yet needed
32
}
33
34
}
using System;
using System.Runtime.Remoting.Channels;
using System.Runtime.Remoting.Messaging;
using System.IO;
namespace CompressionSink


{
public class CompressionServerSink: BaseChannelSinkWithProperties,
IServerChannelSink

{
private IServerChannelSink _nextSink;
public CompressionServerSink(IServerChannelSink next)

{
_nextSink = next;
}
public IServerChannelSink NextChannelSink

{
get

{
return _nextSink;
}
}
public void AsyncProcessResponse(IServerResponseChannelSinkStack sinkStack,
object state,
IMessage msg,
ITransportHeaders headers,
Stream stream)

{
// compressing the response
stream=CompressionHelper.getCompressedStreamCopy(stream);
// forwarding to the stack for further processing
sinkStack.AsyncProcessResponse(msg,headers,stream);
}
public Stream GetResponseStream(IServerResponseChannelSinkStack sinkStack,
object state,
IMessage msg,
ITransportHeaders headers)

{
return null;
}
public ServerProcessing ProcessMessage(IServerChannelSinkStack sinkStack,
IMessage requestMsg,
ITransportHeaders requestHeaders,
Stream requestStream,
out IMessage responseMsg,
out ITransportHeaders responseHeaders,
out Stream responseStream)

{
// uncompressing the request
Stream localrequestStream =
CompressionHelper.getUncompressedStreamCopy(requestStream);
// pushing onto stack and forwarding the call
sinkStack.Push(this,null);
Stream localresponseStream;
ServerProcessing srvProc = _nextSink.ProcessMessage(sinkStack,
requestMsg,
requestHeaders,
localrequestStream,
out responseMsg,
out responseHeaders,
out localresponseStream);
// compressing the response
responseStream=
CompressionHelper.getCompressedStreamCopy(localresponseStream);
// returning status information
return srvProc;
}
}
}
1
public class CompressionHelper
2
{
3
4
/**//// <summary>
5
/// refactor by zendy
6
/// </summary>
7
/// <param name="inStream"></param>
8
/// <returns></returns>
9
public static Stream getCompressedStreamCopy(Stream inStream)
10
{
11
MemoryStream outStream = new MemoryStream();
12
Deflater mDeflater = new Deflater(Deflater.BEST_COMPRESSION);
13
DeflaterOutputStream compressStream = new DeflaterOutputStream(outStream,mDeflater);
14
15
byte[] buf = new Byte[4096];
16
int cnt = inStream.Read(buf,0,4096);
17
while (cnt>0)
{
18
compressStream.Write(buf,0,cnt);
19
cnt = inStream.Read(buf,0,4096);
20
}
21
compressStream.Finish();
22
//modify by zendy //这个设置非常重要,否则会导致后续Sink在处理该stream时失败,在原来的源码中就是因为没有这个处理导致程序运行失败
23
outStream.Seek(0,SeekOrigin.Begin);
24
return outStream;
25
}
26
27
/**//// <summary>
28
/// refactor by zendy
29
/// </summary>
30
/// <param name="inStream"></param>
31
/// <returns></returns>
32
public static Stream getUncompressedStreamCopy(Stream inStream)
33
{
34
InflaterInputStream unCompressStream = new InflaterInputStream(inStream);
35
MemoryStream outStream = new MemoryStream();
36
int mSize;
37
Byte[] mWriteData = new Byte[4096];
38
while(true)
39
{
40
mSize = unCompressStream.Read(mWriteData, 0, mWriteData.Length);
41
if (mSize > 0)
42
{
43
outStream.Write(mWriteData, 0, mSize);
44
}
45
else
46
{
47
break;
48
}
49
}
50
unCompressStream.Close();
51
//modify by zendy//这个设置非常重要,否则会导致后续Sink在处理该stream时失败,,在原来的源码中就是因为没有这个处理导致程序运行失败
52
outStream.Seek(0,SeekOrigin.Begin);
53
return outStream;
54
}
55
}
BTW,这个Sink还可以扩展,比如判断需要压缩Stream的大小,如果很大,就压缩,否则不压缩(可以在responseHeaders和requestHeaders添加是否 已经压缩的标记)