MSMQ实现自定义序列化存储
在使用MSMQ的时候一般只会使用默认的XML序列化来对消息进行存储,但XML存储的缺点是序列化体积相对比较大和效率上有点低.其实.net提供非常简单的方式让我们实现不同序列化方式来存储MSMQ信息,如json,protobuf等.为了能够让开发人员实现自定义序列化的消息存储,.NET提供了IMessageFormatter这样一个接口,只需要简单地实现这个接口就可以对MSMQ的消息进行处理.以下讲解如何实现json和protobuf的messageformater.
IMessageFormatter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
// 摘要: // 从“消息队列”消息体序列化或反序列化对象。 [TypeConverter( typeof (MessageFormatterConverter))] public interface IMessageFormatter : ICloneable { // 摘要: // 在类中实现时,确定格式化程序是否可以反序列化消息的内容。 // // 参数: // message: // 要检查的 System.Messaging.Message。 // // 返回结果: // 如果格式化程序可以反序列化消息,则为 true;否则为 false。 bool CanRead(Message message); // // 摘要: // 在类中实现时,读取给定消息中的内容并创建包含该消息中的数据的对象。 // // 参数: // message: // The System.Messaging.Message to deserialize. // // 返回结果: // 反序列化的消息。 object Read(Message message); // // 摘要: // 在类中实现时,将对象序列化到消息体中。 // // 参数: // message: // System.Messaging.Message,它将包含序列化的对象。 // // obj: // 要序列化到消息中的对象。 void Write(Message message, object obj); } |
接口非常简单,主要规范了MSMQ写入和读取的规则.
Json Formater
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
public class JsonFormater<T> : IMessageFormatter { public bool CanRead(Message message) { return message.BodyStream != null && message.BodyStream.Length > 0; } [ThreadStatic] private static byte [] mBuffer; public object Read(Message message) { if (mBuffer== null ) mBuffer = new byte [4096]; int count =( int )message.BodyStream.Length; message.BodyStream.Read(mBuffer, 0, count); return Newtonsoft.Json.JsonConvert.DeserializeObject(Encoding.UTF8.GetString(mBuffer, 0, count), typeof (T)); } [System.ThreadStatic] private static System.IO.MemoryStream mStream; public void Write(Message message, object obj) { if (mStream == null ) mStream = new System.IO.MemoryStream(4096); mStream.Position = 0; mStream.SetLength(4095); string value = Newtonsoft.Json.JsonConvert.SerializeObject(obj); int count = Encoding.UTF8.GetBytes(value, 0, value.Length, mStream.GetBuffer(), 0); mStream.SetLength(count); message.BodyStream = mStream; } public object Clone() { return this ; } } |
Protobuf Formater
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public class ProtobufFormater<T> : IMessageFormatter { public bool CanRead(Message message) { return message.BodyStream != null && message.BodyStream.Length > 0; } public object Read(Message message) { return ProtoBuf.Meta.RuntimeTypeModel.Default.Deserialize(message.BodyStream, null , typeof (T)); } [System.ThreadStatic] private static System.IO.MemoryStream mStream ; public void Write(Message message, object obj) { if (mStream == null ) mStream = new System.IO.MemoryStream(4096); mStream.Position = 0; mStream.SetLength(0); ProtoBuf.Meta.RuntimeTypeModel.Default.Serialize(mStream, obj); message.BodyStream = mStream; } public object Clone() { return this ; } } |
使用Formater
使有自定义Formater比较简单,只需要指定MessageQueue的Formatter属性即可.
1
2
|
MessageQueue queue = new MessageQueue( @".\private$\Test" ); queue.Formatter = new JsonFormater<User>(); |
简单的测性能测试
针对json,protobuf这两种自定义序列化和默认的XML序列化性能上有多大差异,各自进行100000条写入和读取的耗时情况.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch(); sw.Reset(); sw.Start(); for ( int i = 0; i < 100000; i++) { queue.Send(user); } sw.Stop(); Console.WriteLine( "MSMQ send xml formater:" + sw.Elapsed.TotalMilliseconds + "ms" ); sw.Reset(); sw.Start(); for ( int i = 0; i < 100000; i++) { User result = (User)queue.Receive().Body; } sw.Stop(); Console.WriteLine( "MSMQ receive xml formater:" + sw.Elapsed.TotalMilliseconds + "ms" ); queue.Formatter = new JsonFormater<User>(); sw.Reset(); sw.Start(); for ( int i = 0; i < 100000; i++) { queue.Send(user); } sw.Stop(); Console.WriteLine( "MSMQ send Json formater:" + sw.Elapsed.TotalMilliseconds + "ms" ); sw.Reset(); sw.Start(); for ( int i = 0; i < 100000; i++) { User result = (User)queue.Receive().Body; } sw.Stop(); Console.WriteLine( "MSMQ receive json formater:" + sw.Elapsed.TotalMilliseconds + "ms" ); queue.Formatter = new ProtobufFormater<User>(); sw.Reset(); sw.Start(); for ( int i = 0; i < 100000; i++) { queue.Send(user); } sw.Stop(); Console.WriteLine( "MSMQ send Protobuf formater:" + sw.Elapsed.TotalMilliseconds+ "ms" ); sw.Reset(); sw.Start(); for ( int i = 0; i < 100000; i++) { User result = (User)queue.Receive().Body; } sw.Stop(); Console.WriteLine( "MSMQ receive Protobuf formater:" + sw.Elapsed.TotalMilliseconds + "ms" ); |
测试结果
从测试来看还是protobuf效率上占优点:)