NBear.MQ是NBearFramework中新增的分布式消息队列模块,作为NBear Framework的新成员,NBear.MQ秉承NBear一贯的易于使用和零配置需要的特点,大大改善开发基于消息队列的分布式系统的效率。本文通过介绍一个基于NBear.MQ的Sample - TestServiceMQ,演示基于NBear.MQ开发分布式系统的基本方法。
下载
从SF.NET下载NBear_v1.8.0(包括全部框架及示例源码)
如果你只想先体验一下已编译的示例程序,您也可以从这里下TestServiceMQ_bin.zip
(请按如下顺序执行bin中的代码:TestRemotingServer.exe, TestServiceHost.exe,TestClientApp.exe,ServiceHost和Client可以是多个,server将自动随机分配调用service的请求到各个service host进行处理)
注:示例程序会使用本机的8000端口,如果本机8000端口已被占用或被禁用,请修改代码以使用其它端口代替或先启用8000端口。
解析
1、TestRemotingServer
首先是我们的Server,对于Server,如果您使用NBear.MQ内置的MemoryServiceMQ,则几乎不需要编码,只需要运行并发布server实例为remoting service。
1
class Program
2
{
3
static void Main(string[] args)
4
{
5
MemoryServiceMQ mq = new MemoryServiceMQ();
6
mq.OnLog = new LogHandler(Console.WriteLine);
7
RemotingServiceHelper rh =
8
new RemotingServiceHelper(RemotingChannelType.TCP, "127.0.0.1", 8000, new LogHandler(Console.WriteLine));
9
rh.PublishServiceInstance("MMQ", typeof(IServiceMQ), mq, System.Runtime.Remoting.WellKnownObjectMode.Singleton);
10
11
while (Console.ReadLine() != "q")
12
{
13
}
14
}
15
}

2

3

4

5

6

7

8

9

10

11

12

13

14

15

注意,关键是第8-9行这里,我们调用RemotingServiceHelper类发布MemoryServiceMQ的实例mq到tcp://127.0.0.1:8000/MMQ。
如果你不希望使用MemoryServiceMQ,而希望使用基于其它MQ如MSMQ或ActiveMQ系统的MQ控制,您可以自己实现IServiceMQ接口,用来代替这里的mq。
2、TestServiceHost
Service host顾名思义就是服务的提供者,本示例的TestServiceHost定义了两个service,MathService和HelloWorldService,如下:
1
public class MathService : BaseAutoService
2
{
3
public MathService(IServiceMQ mq)
4
: base("demo.math", mq)
5
{
6
}
7
8
private int getResult(char op, int x, int y)
9
{
10
int rt = 0;
11
switch (op)
12
{
13
case '+':
14
rt = x + y;
15
break;
16
case '-':
17
rt = x - y;
18
break;
19
case '*':
20
rt = x * y;
21
break;
22
case '/':
23
rt = x / y;
24
break;
25
26
}
27
return rt;
28
}
29
30
protected override ResponseMessage Run(RequestMessage msg)
31
{
32
Parameter[] parms = msg.Parameters;
33
int rt = getResult(parms[0].Value.ToString()[0], int.Parse(parms[1].Value.ToString()), int.Parse(parms[2].Value.ToString()));
34
ResponseMessage retMsg = new ResponseMessage();
35
retMsg.ServiceName = msg.ServiceName;
36
retMsg.Parameters = new Parameter[] { new Parameter("Result", rt) };
37
retMsg.MessageId = Guid.NewGuid();
38
retMsg.TransactionId = msg.TransactionId;
39
retMsg.RequestHeader = msg.Header;
40
retMsg.Timestamp = DateTime.Now;
41
retMsg.Expiration = DateTime.Now.AddDays(1);
42
43
return retMsg;
44
}
45
}
46
47
public class HelloWorldService : BaseAutoService
48
{
49
public HelloWorldService(IServiceMQ mq) : base("demo.helloworld", mq)
50
{
51
}
52
53
protected override ResponseMessage Run(RequestMessage msg)
54
{
55
Parameter[] parms = msg.Parameters;
56
ResponseMessage retMsg = new ResponseMessage();
57
retMsg.ServiceName = msg.ServiceName;
58
retMsg.MessageId = Guid.NewGuid();
59
retMsg.TransactionId = msg.TransactionId;
60
retMsg.RequestHeader = msg.Header;
61
retMsg.Timestamp = DateTime.Now;
62
retMsg.Expiration = DateTime.Now.AddDays(1);
63
64
retMsg.Text = "hello world";
65
retMsg.Bytes = System.Text.UTF8Encoding.UTF8.GetBytes(retMsg.Text);
66
retMsg.Data = new System.Data.DataSet("hello world data");
67
68
return retMsg;
69
}
70
}

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

大家可以看到,这里的service都是从BaseAutoService继承的,您也可以实现IService或继承BaseService。一个可以被NBear.MQ使用的Service基本上做的事情就是接受一个RequestMessage类型的输入参数,返回一个ResponseMessage类型的返回结果。
RequestMessage和ResponseMessage都从Message类型继承,除了包含请求和返回相关的一组参数(.net基本数据类型),每个Message包含Text、Bytes和Data属性,分别允许存放和传递string,byte[]和Dataset类型的数据。可见Message实际是一个比较通用的DTO。
下面,我们要将示例service注册到server,这个过程可以非常方便的通过SimpleServiceContainer来实现:
1
class Program
2
{
3
static void Main(string[] args)
4
{
5
IServiceMQ mq = new RemotingClientHelper(RemotingChannelType.TCP, "127.0.0.1", 8000, 0, new LogHandler(Console.WriteLine)).GetClientInstance<IServiceMQ>("MMQ");
6
IServiceContainer container = new SimpleServiceContainer(mq);
7
8
container.RegisterComponent("math host 1", typeof(MathService));
9
container.RegisterComponent("math host 2", typeof(MathService));
10
container.RegisterComponent("helloworld 1", typeof(HelloWorldService));
11
12
while (Console.ReadLine() != "q")
13
{
14
}
15
}
16
}

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

您可以看到,我们首先要通过RemotingClientHelper类获得我们的server发布出来的mq实例。然后,创建一个SimpleServiceContainer实例,并将我们的示例service添加到container,就这么简单。SimpleSeviceContainer会自动为我们注册和绑定service和mq,您也可以注册多个相同和不同类型的service到container。
3、TestClientApp
Client是一个WinForm程序,自然就是调用Service了。代码如下:
1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel;
4
using System.Data;
5
using System.Drawing;
6
using System.Text;
7
using System.Windows.Forms;
8
9
using NBear.MQ.ServiceMQ;
10
using NBear.MQ;
11
using NBear.Common;
12
using NBear.Common.Remoting;
13
14
namespace TestClientApp
15
{
16
public partial class Form1 : Form
17
{
18
public Form1()
19
{
20
InitializeComponent();
21
}
22
23
private void Form1_Load(object sender, EventArgs e)
24
{
25
comboOP.SelectedIndex = 0;
26
27
rh = new RemotingClientHelper(RemotingChannelType.TCP, "127.0.0.1", 8000, 0, new LogHandler(Console.WriteLine));
28
mq = rh.GetClientInstance<IServiceMQ>("MMQ");
29
mq.OnLog = new LogHandler(Console.WriteLine);
30
container = new SimpleServiceContainer(mq);
31
}
32
33
private void txtX_TextChanged(object sender, EventArgs e)
34
{
35
txtResult.Text = string.Empty;
36
}
37
38
private void comboOP_SelectedIndexChanged(object sender, EventArgs e)
39
{
40
txtResult.Text = string.Empty;
41
}
42
43
private void txtY_TextChanged(object sender, EventArgs e)
44
{
45
txtResult.Text = string.Empty;
46
}
47
48
RemotingClientHelper rh = null;
49
IServiceMQ mq = null;
50
IServiceContainer container = null;
51
52
private void btnCompute_Click(object sender, EventArgs e)
53
{
54
try
55
{
56
int.Parse(txtX.Text);
57
if (comboOP.SelectedIndex == 3)
58
{
59
if (int.Parse(txtY.Text) != 0)
60
{
61
}
62
}
63
else
64
{
65
int.Parse(txtY.Text);
66
}
67
}
68
catch
69
{
70
MessageBox.Show("x, y must be number, and y must not be 0 when OP is '/'!");
71
return;
72
}
73
74
RequestMessage msg = new RequestMessage();
75
msg.Expiration = DateTime.Now.AddDays(1);
76
msg.MessageId = Guid.NewGuid();
77
msg.Parameters = new Parameter[] { new Parameter("op", comboOP.Text), new Parameter("x", int.Parse(txtX.Text)), new Parameter("y", int.Parse(txtY.Text)) };
78
msg.ServiceName = "demo.math";
79
msg.Timestamp = DateTime.Now;
80
msg.TransactionId = Guid.NewGuid();
81
82
ResponseMessage retMsg = container.CallService(msg.ServiceName, msg);
83
84
if (retMsg != null)
85
{
86
txtResult.Text = retMsg.Parameters[0].Value.ToString();
87
}
88
else
89
{
90
MessageBox.Show("Call service failed!");
91
}
92
}
93
94
private void btnHelloWorld_Click(object sender, EventArgs e)
95
{
96
RequestMessage msg = new RequestMessage();
97
msg.Expiration = DateTime.Now.AddDays(1);
98
msg.MessageId = Guid.NewGuid();
99
msg.ServiceName = "demo.helloworld";
100
msg.Timestamp = DateTime.Now;
101
msg.TransactionId = Guid.NewGuid();
102
103
msg.Text = "who r u?";
104
105
ResponseMessage retMsg = container.CallService(msg.ServiceName, msg);
106
107
MessageBox.Show(retMsg.Text);
108
}
109
}
110
}

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

Line 23-31 在Form Load是初始化container,这和service host是完全一样的。
调用service只需通过container.CallService方法,如Line 82 和 105。
问题
1、您可能会问,client的代码除了调用service和service host的代码几乎相同,那么是否可以在client中既使用service又提供service呢 ?当然可以,你也完全可以在client这里register service,当client调用一个service时,container会首先检查是否存在本地service,存在则调用本地,不存在则向server请求远程service。
2、本示例是在一台服务器上演示这个分布式构架,您也可以修改地址,并分别把server, service host和client部署到互相连接的多台server上。
运行
请按如下顺序执行bin中的代码:TestRemotingServer.exe, TestServiceHost.exe,TestClientApp.exe,ServiceHost和Client可以是多个,server将自动随机分配调用service的请求到各个service host进行处理。在运行过程中,请尝试多次点击client的按钮,并观察,server和service host的日志,看是不是负载均衡。您也可以尝试在运行过程中关闭某些service host或client,或者再新增一些,注意server会自动维护service host和client的绑定和回调。
致谢
NBear.MQ很多地方参考了alex兄的QPG分布式构架,并得到了alex兄的许多指点,在此表示最高的谢意!