zoukankan      html  css  js  c++  java
  • 支持异步同步的分布式CommandBus MSMQ实现

    支持异步同步的分布式CommandBus MSMQ实现

    先上一张本文所描述的适用场景图

     

    分布式场景,共3台server:

    1. 前端Server
    2. Order App Server
    3. Warehouse App Server

    功能:

    1. 前端Server可以不停的发送Command到CommandBus,然后由CommandBus分配不同的Command到各自的app server去处理。
    2. 前端Server可以只发送Command而不必等待Response
    3. 前端Server可以同步等待Response返回
    4. MSMQ消息超过3.5M会自动转为网络共享方式传输消息
    5. 对于同一Command的处理,可以通过增加App Server的方式来提高并发处理速度(比如:可以开2个app server instance来同时处理ACommand的处理)

    本文目标是用msmq实现分布式CommandBus的应用(已经更新到A2D Framework中了)。

    前端Server发送Command的方式(异步):

    ACommand cmd = new ACommand() { Tag = "aaa" };
    CommandBusDistributer<ACommand, ACommandResult> cmdDistributer = new CommandBusDistributer<ACommand, ACommandResult>();
    cmdDistributer.ResultCatached += new CommandResultCatchedDelegate<ACommandResult>(cmdDistributer_ResultCatached);
    cmdDistributer.SendRequest(cmd);

    同步方式:

    ACommand cmd = new ACommand() { Tag = "aaa" };
    CommandBusDistributer<ACommand, ACommandResult> cmdDistributer = new CommandBusDistributer<ACommand, ACommandResult>(); 
    cmdDistributer.SendRequest(cmd);
    ACommandResult result=cmdDistributer.WaitResponse();

    配置文件

    复制代码
    <?xml version="1.0" encoding="utf-8" ?>
    <CommandBusSetting>
      <AutoCreateIfNotExists>true</AutoCreateIfNotExists>
      <CommandQueue>PC-20130606HCVPprivate$Commands_{0}</CommandQueue>
      <ResponseQueue>PC-20130606HCVPprivate$CommandResponses</ResponseQueue>
      <NetworkLocation>\PC-20130606HCVP
    etwork</NetworkLocation>
    </CommandBusSetting>
    复制代码

    Command的编写方式

    [QueueName("ACommand")]//这个可选,没有QueueName时,默认对应的msmq队列名为类名,此处为ACommand
    public class ACommand : BaseCommand  //需要继承自BaseCommand
    {
            public string Tag { get; set; }//自定义的属性
    }

    后端App Server的编写

    CommandHandlerHost1: Console程序,相当于App Server 1,会处理部分Command

    复制代码
    static void Main(string[] args)
            {
                Thread.Sleep(2000);
    
                CommandHandlerListener listener = new CommandHandlerListener();
                listener.AddHandler(new TestCommandHandlers());
                listener.AddHandler(new Test2CommandHandlers());
                listener.Start();
                Console.ReadKey();
            }
    复制代码

    CommandHandlerHost2: Console程序,相当于App Server 2,会处理部分Command

    复制代码
    static void Main(string[] args)
            {
                Thread.Sleep(2000);
    
                CommandHandlerListener listener = new CommandHandlerListener();
                listener.AddHandler(new Test3CommandHandlers());
                listener.Start();
                Console.ReadKey();
            }
    复制代码

    CommandHandlers: 所有的Command处理函数都会在这个项目中实现

    复制代码
    public class TestCommandHandlers : ICommandHandlers,
            ICommandHandler<ACommand, ACommandResult>,
            ICommandHandler<BCommand, BCommandResult>
        {
            public ACommandResult Handler(ACommand cmd)
            {
                Console.WriteLine("From [public ACommandResult Handler(ACommand cmd)]: " + cmd.Tag);
    
                ACommandResult result = new ACommandResult();
                result.Result = "result from ACommand";
                return result;
            }
    
            public BCommandResult Handler(BCommand cmd)
            {
                Console.WriteLine("From [public BCommandResult Handler(BCommand cmd)]: " + cmd.Tag);
    
                BCommandResult result = new BCommandResult();
                result.Result = "result from BCommand";
                return result;
            }
        }
    复制代码

     下面是目前的性能测试:

    下载代码 

    自省推动进步,视野决定未来。
    心怀远大理想。
    为了家庭幸福而努力。
    A2D科技,服务社会。
    A2D Framework(Alpha)
    • 1. Cache System(本地缓存与分布式缓存共存、支持Memcache和Redis、支持贴标签形式(类似Spring 3.x的Cache形式))
    • 2. Event System(本地事件与分布式事件分发)
    • 3. IoC(自动匹配功能,实例数量限制功能)
    • 4. Sql Dispatcher System(基于Sql server的读写分离系统)
    • 5. Session System(分布式Session系统)
    • 6. 分布式Command Bus(MSMQ实现,解决4M限制)
     
  • 相关阅读:
    ActiveReports 报表应用教程 (3)---图表报表
    ActiveReports 报表应用教程 (4)---分栏报表
    ActiveReports 报表应用教程 (5)---解密电子商务领域首张电子发票的诞生(套打报表)
    hdu4467 Graph
    ActiveReports 报表应用教程 (6)---分组报表
    [leetcode]Search a 2D Matrix
    参加百度开放云编程马拉松后一点总结
    CAS服务器配置
    Tomcat 的 SSL 配置
    在windows server 2008 R2 64bit上面配置PI OPC Server的DCOM
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/3302659.html
Copyright © 2011-2022 走看看