zoukankan      html  css  js  c++  java
  • 使用NBear.MQ分布式服务消息队列模块开发分布式系统

    NBear.MQ是NBearFramework中新增的分布式消息队列模块,作为NBear Framework的新成员,NBear.MQ秉承NBear一贯的易于使用和零配置需要的特点,大大改善开发基于消息队列的分布式系统的效率。本文通过介绍一个基于NBear.MQ的Sample - TestServiceMQ,演示基于NBear.MQ开发分布式系统的基本方法。

    下载

    从SF.NET下载NBear_v1.8.0(包括全部框架及示例源码)
    如果你只想先体验一下已编译的示例程序,您也可以从这里下TestServiceMQ_bin.zip 
    (请按如下顺序执行bin中的代码:TestRemotingServer.exe, TestServiceHost.exe,TestClientApp.exe,ServiceHost和Client可以是多个,server将自动随机分配调用service的请求到各个service host进行处理)

    注:示例程序会使用本机的8000端口,如果本机8000端口已被占用或被禁用,请修改代码以使用其它端口代替或先启用8000端口。

    解析

    1、TestRemotingServer

    首先是我们的Server,对于Server,如果您使用NBear.MQ内置的MemoryServiceMQ,则几乎不需要编码,只需要运行并发布server实例为remoting service。

     1    class Program
     2    {
     3        static void Main(string[] args)
     4        {
     5            MemoryServiceMQ mq = new MemoryServiceMQ();
     6            mq.OnLog = new LogHandler(Console.WriteLine);
     7            RemotingServiceHelper rh =
     8                new RemotingServiceHelper(RemotingChannelType.TCP, "127.0.0.1"8000new LogHandler(Console.WriteLine));
     9            rh.PublishServiceInstance("MMQ"typeof(IServiceMQ), mq, System.Runtime.Remoting.WellKnownObjectMode.Singleton);
    10
    11            while (Console.ReadLine() != "q")
    12            {
    13            }

    14        }

    15    }

    注意,关键是第8-9行这里,我们调用RemotingServiceHelper类发布MemoryServiceMQ的实例mq到tcp://127.0.0.1:8000/MMQ。

    如果你不希望使用MemoryServiceMQ,而希望使用基于其它MQ如MSMQ或ActiveMQ系统的MQ控制,您可以自己实现IServiceMQ接口,用来代替这里的mq。

    2、TestServiceHost

    Service host顾名思义就是服务的提供者,本示例的TestServiceHost定义了两个service,MathService和HelloWorldService,如下:

     1    public class MathService : BaseAutoService
     2    {
     3        public MathService(IServiceMQ mq)
     4            : base("demo.math", mq)
     5        {
     6        }

     7
     8        private int getResult(char op, int x, int y)
     9        {
    10            int rt = 0;
    11            switch (op)
    12            {
    13                case '+':
    14                    rt = x + y;
    15                    break;
    16                case '-':
    17                    rt = x - y;
    18                    break;
    19                case '*':
    20                    rt = x * y;
    21                    break;
    22                case '/':
    23                    rt = x / y;
    24                    break;
    25
    26            }

    27            return rt;
    28        }

    29
    30        protected override ResponseMessage Run(RequestMessage msg)
    31        {
    32            Parameter[] parms = msg.Parameters;
    33            int rt = getResult(parms[0].Value.ToString()[0], int.Parse(parms[1].Value.ToString()), int.Parse(parms[2].Value.ToString()));
    34            ResponseMessage retMsg = new ResponseMessage();
    35            retMsg.ServiceName = msg.ServiceName;
    36            retMsg.Parameters = new Parameter[] new Parameter("Result", rt) };
    37            retMsg.MessageId = Guid.NewGuid();
    38            retMsg.TransactionId = msg.TransactionId;
    39            retMsg.RequestHeader = msg.Header;
    40            retMsg.Timestamp = DateTime.Now;
    41            retMsg.Expiration = DateTime.Now.AddDays(1);
    42
    43            return retMsg;
    44        }

    45    }

    46
    47    public class HelloWorldService : BaseAutoService
    48    {
    49        public HelloWorldService(IServiceMQ mq) : base("demo.helloworld", mq)
    50        {
    51        }

    52
    53        protected override ResponseMessage Run(RequestMessage msg)
    54        {
    55            Parameter[] parms = msg.Parameters;
    56            ResponseMessage retMsg = new ResponseMessage();
    57            retMsg.ServiceName = msg.ServiceName;
    58            retMsg.MessageId = Guid.NewGuid();
    59            retMsg.TransactionId = msg.TransactionId;
    60            retMsg.RequestHeader = msg.Header;
    61            retMsg.Timestamp = DateTime.Now;
    62            retMsg.Expiration = DateTime.Now.AddDays(1);
    63
    64            retMsg.Text = "hello world";
    65            retMsg.Bytes = System.Text.UTF8Encoding.UTF8.GetBytes(retMsg.Text);
    66            retMsg.Data = new System.Data.DataSet("hello world data");
    67
    68            return retMsg;
    69        }

    70    }

    大家可以看到,这里的service都是从BaseAutoService继承的,您也可以实现IService或继承BaseService。一个可以被NBear.MQ使用的Service基本上做的事情就是接受一个RequestMessage类型的输入参数,返回一个ResponseMessage类型的返回结果。

    RequestMessage和ResponseMessage都从Message类型继承,除了包含请求和返回相关的一组参数(.net基本数据类型),每个Message包含Text、Bytes和Data属性,分别允许存放和传递string,byte[]和Dataset类型的数据。可见Message实际是一个比较通用的DTO。

    下面,我们要将示例service注册到server,这个过程可以非常方便的通过SimpleServiceContainer来实现:

     1    class Program
     2    {
     3        static void Main(string[] args)
     4        {
     5            IServiceMQ mq = new RemotingClientHelper(RemotingChannelType.TCP, "127.0.0.1"80000new LogHandler(Console.WriteLine)).GetClientInstance<IServiceMQ>("MMQ");
     6            IServiceContainer container = new SimpleServiceContainer(mq);
     7
     8            container.RegisterComponent("math host 1"typeof(MathService));
     9            container.RegisterComponent("math host 2"typeof(MathService));
    10            container.RegisterComponent("helloworld 1"typeof(HelloWorldService));
    11
    12            while (Console.ReadLine() != "q")
    13            {
    14            }

    15        }

    16    }

    您可以看到,我们首先要通过RemotingClientHelper类获得我们的server发布出来的mq实例。然后,创建一个SimpleServiceContainer实例,并将我们的示例service添加到container,就这么简单。SimpleSeviceContainer会自动为我们注册和绑定service和mq,您也可以注册多个相同和不同类型的service到container。

    3、TestClientApp

    Client是一个WinForm程序,自然就是调用Service了。代码如下:

      1using System;
      2using System.Collections.Generic;
      3using System.ComponentModel;
      4using System.Data;
      5using System.Drawing;
      6using System.Text;
      7using System.Windows.Forms;
      8
      9using NBear.MQ.ServiceMQ;
     10using NBear.MQ;
     11using NBear.Common;
     12using NBear.Common.Remoting;
     13
     14namespace TestClientApp
     15{
     16    public partial class Form1 : Form
     17    {
     18        public Form1()
     19        {
     20            InitializeComponent();
     21        }

     22
     23        private void Form1_Load(object sender, EventArgs e)
     24        {
     25            comboOP.SelectedIndex = 0;
     26
     27            rh = new RemotingClientHelper(RemotingChannelType.TCP, "127.0.0.1"80000new LogHandler(Console.WriteLine));
     28            mq = rh.GetClientInstance<IServiceMQ>("MMQ");
     29            mq.OnLog = new LogHandler(Console.WriteLine);
     30            container = new SimpleServiceContainer(mq);
     31        }

     32
     33        private void txtX_TextChanged(object sender, EventArgs e)
     34        {
     35            txtResult.Text = string.Empty;
     36        }

     37
     38        private void comboOP_SelectedIndexChanged(object sender, EventArgs e)
     39        {
     40            txtResult.Text = string.Empty;
     41        }

     42
     43        private void txtY_TextChanged(object sender, EventArgs e)
     44        {
     45            txtResult.Text = string.Empty;
     46        }

     47
     48        RemotingClientHelper rh = null;
     49        IServiceMQ mq = null;
     50        IServiceContainer container = null;
     51
     52        private void btnCompute_Click(object sender, EventArgs e)
     53        {
     54            try
     55            {
     56                int.Parse(txtX.Text);
     57                if (comboOP.SelectedIndex == 3)
     58                {
     59                    if (int.Parse(txtY.Text) != 0)
     60                    {
     61                    }

     62                }

     63                else
     64                {
     65                    int.Parse(txtY.Text);
     66                }

     67            }

     68            catch
     69            {
     70                MessageBox.Show("x, y must be number, and y must not be 0 when OP is '/'!");
     71                return;
     72            }

     73
     74            RequestMessage msg = new RequestMessage();
     75            msg.Expiration = DateTime.Now.AddDays(1);
     76            msg.MessageId = Guid.NewGuid();
     77            msg.Parameters = new Parameter[] new Parameter("op", comboOP.Text), new Parameter("x"int.Parse(txtX.Text)), new Parameter("y"int.Parse(txtY.Text)) };
     78            msg.ServiceName = "demo.math";
     79            msg.Timestamp = DateTime.Now;
     80            msg.TransactionId = Guid.NewGuid();
     81
     82            ResponseMessage retMsg = container.CallService(msg.ServiceName, msg);
     83
     84            if (retMsg != null)
     85            {
     86                txtResult.Text = retMsg.Parameters[0].Value.ToString();
     87            }

     88            else
     89            {
     90                MessageBox.Show("Call service failed!");
     91            }

     92        }

     93
     94        private void btnHelloWorld_Click(object sender, EventArgs e)
     95        {
     96            RequestMessage msg = new RequestMessage();
     97            msg.Expiration = DateTime.Now.AddDays(1);
     98            msg.MessageId = Guid.NewGuid();
     99            msg.ServiceName = "demo.helloworld";
    100            msg.Timestamp = DateTime.Now;
    101            msg.TransactionId = Guid.NewGuid();
    102
    103            msg.Text = "who r u?";
    104
    105            ResponseMessage retMsg = container.CallService(msg.ServiceName, msg);
    106
    107            MessageBox.Show(retMsg.Text);
    108        }

    109    }

    110}

    Line 23-31  在Form Load是初始化container,这和service host是完全一样的。

    调用service只需通过container.CallService方法,如Line 82 和 105。

    问题

    1、您可能会问,client的代码除了调用service和service host的代码几乎相同,那么是否可以在client中既使用service又提供service呢 ?当然可以,你也完全可以在client这里register service,当client调用一个service时,container会首先检查是否存在本地service,存在则调用本地,不存在则向server请求远程service。

    2、本示例是在一台服务器上演示这个分布式构架,您也可以修改地址,并分别把server, service host和client部署到互相连接的多台server上。

    运行

    请按如下顺序执行bin中的代码:TestRemotingServer.exe, TestServiceHost.exe,TestClientApp.exe,ServiceHost和Client可以是多个,server将自动随机分配调用service的请求到各个service host进行处理。在运行过程中,请尝试多次点击client的按钮,并观察,server和service host的日志,看是不是负载均衡。您也可以尝试在运行过程中关闭某些service host或client,或者再新增一些,注意server会自动维护service host和client的绑定和回调。

    致谢

    NBear.MQ很多地方参考了alex兄的QPG分布式构架,并得到了alex兄的许多指点,在此表示最高的谢意!

  • 相关阅读:
    logstash日志分析的配置和使用
    实现跨浏览器html5表单验证
    CSS常见居中讨论
    centos7 初始化脚本
    elasticsearch+logstash+redis+kibana 实时分析nginx日志
    centos7 系统优化
    cAdvisor+InfluxDB+Grafana 监控Docker
    Docker三剑客之Docker Swarm
    Docker三剑客之常用命令
    Docker三剑客之Docker Compose
  • 原文地址:https://www.cnblogs.com/teddyma/p/414059.html
Copyright © 2011-2022 走看看