zoukankan      html  css  js  c++  java
  • NServiceBus+RabbitMQ开发分布式应用

    前言

    NServiceBus提供了8种传输管道组件,分别是Learning、MSMQ、Azure Service Bus、Azure Service Bus (Legacy)、Azure Storage Queues、SQL Server、RabbitMQ、Amazon SQS。前两篇我们主要用的是Learnning,这篇使用RabbitMQ,也可以直接用于生产。

    安装RabbitMQ组件

    RabbitMQ不在NServiceBus下,是一个单独的组件,需要单独安装。
    QQ截图20191025114539.jpg

    设置RabbitMQ连接串

    var transport = config.UseTransport<RabbitMQTransport>();
    transport.ConnectionString("host=192.168.80.129;username=admin;password=admin");
    transport.UseDirectRoutingTopology();
    

    消息生产者(ClientUI)

     class Program
        {
            static ILog log = LogManager.GetLogger<Program>();
            static void Main(string[] args)
            {
                MainAsync().GetAwaiter().GetResult();
            }
    
            static async Task MainAsync()
            {
                Console.Title = "Sample.ClientUI";
                var config = new EndpointConfiguration("Sample.ClientUI");
                config.UseSerialization<NewtonsoftSerializer>();
                config.UsePersistence<InMemoryPersistence>();
                config.EnableInstallers();
    
                var transport = config.UseTransport<RabbitMQTransport>();
                transport.ConnectionString("host=192.168.80.129;username=admin;password=admin");
                transport.UseDirectRoutingTopology();
    
                var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);
    
                await RunAsync(endpointInstance).ConfigureAwait(false);
    
                await endpointInstance.Stop().ConfigureAwait(false);
            }
    
    
            static async Task RunAsync(IEndpointInstance endpointInstance)
            {
                log.Info("Press 'P' to send an PlaceOrder Command");
                while (true)
                {
                    var key = Console.ReadKey();
                    Console.WriteLine();
    
                    switch (key.Key)
                    {
                        case ConsoleKey.P:
                            {
                                var command = new PlaceOrder
                                {
                                    OrderId = Guid.NewGuid().ToString()
                                };
    
                                log.Info($"Sending PlaceOrder with OrderId:{command.OrderId}");
                                await endpointInstance.Send("Sample.Server",command).ConfigureAwait(false);
                                break;
                            }
    
                        case ConsoleKey.Q:
                            return;
                        default:
                            log.Info("Please try again");
                            break;
                    }
                }
            }
        }
    

    先启动ClientUI,启动后先发送几条数据,发送数据观察程序控制台数据是否发送,然后在RabbitMQ控制台里观察Queue的情况,没创建Queue时,这里会自动创建Queue,队列名称和端点名称相同。这里发送消息数据,数据会在Sample.Server队列里。
    QQ截图20191025114228.jpg

    QQ截图20191025114115.jpg

    QQ截图20191025114135.jpg
         这里能看到在队列Sample.Server里有三条未消费的数据,因为我还没有启动Sample.Server控制台程序。

    消息消费者(Server)

    public class Program
    {
        static ILog log = LogManager.GetLogger<Program>();
        static void Main(string[] args)
        {
            MainAsync().GetAwaiter().GetResult();
        }
    
    
        static async Task MainAsync()
        {
            Console.Title = "Sample.Server";
            var config = new EndpointConfiguration("Sample.Server");
            config.UseSerialization<NewtonsoftSerializer>();
            config.UsePersistence<InMemoryPersistence>();
            config.EnableInstallers();
    
            var transport = config.UseTransport<RabbitMQTransport>();
            transport.ConnectionString("host=192.168.80.129;username=admin;password=admin");
            transport.UseDirectRoutingTopology();
    
            var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);
    
            log.Info("Press any key to quit");
            Console.ReadKey();
    
            await endpointInstance.Stop().ConfigureAwait(false);
        }
    }
    
    public class PlaceOrderHandler : IHandleMessages<PlaceOrder>
    {
        static ILog log = LogManager.GetLogger<PlaceOrderHandler>();
        public Task Handle(PlaceOrder message, IMessageHandlerContext context)
        {
            log.Info($"Received PlaceOrder with OrderId:{message.OrderId}");
            return Task.CompletedTask;
        }
    }
    

    启动Sample.Server控制台后,就会消费Queue里的数据,消费完成后观察程序控制台的提示和RabbitMQ控制台的提示。
    QQ截图20191025114221.jpg

    QQ截图20191025114245.jpg
    QQ截图20191025114259.jpg
        这里能看到刚才堆积的三条数据已经被消费了。

    总结

    写完这个demo我就在想在程序里通过使用NServiceBus的RabbitMQ组件和直接在程序里使用RabbitMQ直接调用的区别,到底有没有必要通过NServiceBus调用,还需要进一步考量。

  • 相关阅读:
    2007年8月小记
    2007年7月小记
    CS2007.1 在多应用程序中的单点登录
    C#类型转换2
    checkbox与说明文字无法对齐的问题
    css中的内容溢出
    javascript获取的层(div)高度
    C#类型转换3
    js修改css样式表解析(转)
    (转)javascript选择id class
  • 原文地址:https://www.cnblogs.com/sword-successful/p/11737530.html
Copyright © 2011-2022 走看看