zoukankan      html  css  js  c++  java
  • MassTransit 实现应用程序间交互

    MassTransit 介绍 

    先看下masstransit 官网介绍:MassTransit 是一个自由、开源、轻量级的消息总线, 用于使用. NET 框架创建分布式应用程序。MassTransit 在现有消息传输提供了一组广泛的功能, 从而使开发人员能够友好地使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠和可扩展的方式。前段时间看eshop文档,在微服务之间实现基于事件的通信章节。有提到MassTransit是实现event Bus技术之一,于是就翻译了几篇

    下面利用MassTransit实现eshop中的一个简单实例(前提你刚好了解eshop中的这个场景): Catelog微服务中产品价格更改,Basket微服务通知购车价格变动

     

    如果没看源码,这个场景也很常见很好理解,我们在接下来创建两个webapi站点和一个类库来演示如何实现上述场景

    在类库中定义产品价格变动消息

     public interface IProductPriceChanged
        {
            int ProductId { get; set; }
    
            decimal NewPrice { get; set; }
    
            decimal OldPrice { get; set; }
    
     }

    MassTransit.Catalog站点 

    引入一下包:MassTransit、MassTransit.RabbitMQ、Autofac.Extensions.DependencyInjection。在Startup类ConfigureServices中添加:

     builder.Register(c =>
                {
                    return Bus.Factory.CreateUsingRabbitMq(cfg =>
                    {
                        cfg.Host(new Uri($"rabbitmq://{Configuration["HostName"]}"), h =>
                        {
                            h.Username(Configuration["UserName"]);
                            h.Password(Configuration["Password"]);
                        });
                    });
                })
                .As<IBusControl>()
                .As<IBus>()
                .As<IPublishEndpoint>()
                .SingleInstance();
                builder.Populate(services);
                container = builder.Build();
                return new AutofacServiceProvider(container);

    然后在Configure方法添加:  

    var bus = container.Resolve<IBusControl>();
    var busHandle = TaskUtil.Await(() => bus.StartAsync());
     lifetime.ApplicationStopping.Register(() => busHandle.Stop());

    最后在Controller添加模拟修改价格的接口

     private readonly IBus _bus;
            public ValuesController(IBus bus)
            {
                _bus = bus;
            }
            // GET api/values
            [HttpGet]
            public async Task<IEnumerable<string>> GetAsync()
            {
                await _bus.Publish<IProductPriceChanged>(new
                {
                    ProductId=100,
                    NewPrice=1999,
                    OldPrice=2000,
                });
                return new string[] { "价格更改" };
            }

    到此发布端已经完成了 

    MassTransit.Basket站点

    同样引入包,并在Configure方法中添加订阅的代码   

    var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
                {
                    var host = sbc.Host(new Uri($"rabbitmq://{Configuration["RabbitMQ:HostName"]}"), h =>
                    {
                        h.Username(Configuration["RabbitMQ:UserName"]);
                        h.Password(Configuration["RabbitMQ:Password"]);
                    });
                    sbc.ReceiveEndpoint(host, "ProductPriceChangedQueue", e =>
                    {
                        e.Consumer<ProductPriceChangedConsumer>();
                    });
    
                });
                // start/stop the bus with the web application
                applicationLifetime.ApplicationStarted.Register(bus.Start);
                applicationLifetime.ApplicationStopped.Register(bus.Stop);

    消费端 你可以使用其他方式寄宿比如控制台等;

    最后运行两个站点看下输出

     

  • 相关阅读:
    一文弄懂Pytorch的DataLoader, DataSet, Sampler之间的关系
    常用Git命令大全
    职场被边缘化是怎样的
    HTTP网络设置
    java黑科技
    cmd执行java命令
    python 使用镜像下载依赖包
    初识go语言
    华为交换机(S5720)配置日志外发syslog服务器
    Neo4j记录
  • 原文地址:https://www.cnblogs.com/chengtian/p/9068079.html
Copyright © 2011-2022 走看看