zoukankan      html  css  js  c++  java
  • .net core使用EasyNetQ做EventBus

    .net core使用EasyNetQ做EventBus

            </h1>
            <div class="clear"></div>
            <div class="postBody">
    

           随着SOA、微服务、CQRS的盛行,EventBus越来越流行,上GitHub搜了一下,还是有蛮多的这类实现,老牌的有NServiceBus(收费)、MassTransit,最近的有CAP(国人写的,1.4k个Star,非常不错)、ReBus(张队长在NanoFabric中推荐的)、RawRabbit等,今天我介绍的是另外一款产品:EasyNetQ,支持.net core,正如其名,用起来确实very easy。

           首先在asp.net core项目中添加EasyNetQ的nuget引用,注入:

    public void ConfigureServices(IServiceCollection services)
    {
           string rabbitMqConnection = Configuration["RabbitMqConnection"];
           services.AddSingleton(RabbitHutch.CreateBus(rabbitMqConnection));
    }

           1、发布消息:

    复制代码
    public class ValuesController : Controller
        {
            private IBus _bus;
            public ValuesController(IBus bus)
            {
                _bus = bus;
            }
    
        </span><span style="color: #008000;">//</span><span style="color: #008000;"> POST api/values</span>
    

    [HttpPost]
    public async Task Post([FromBody]Order message)
    {
    await _bus.PublishAsync(message);
    }
    }

    复制代码

              可自定义消息的Exchange和Queue,需要修改消息实体:

        [Queue("Qka.Order", ExchangeName = "Qka.Order")]
        public class Order
        {
            public int OrderId { get; set; }
        }

              2、订阅消息

       我们订阅的服务也是寄宿在asp.net core程序中,首先定义消费者:

    复制代码
        public class OrderConsumer : IConsume<Order>
        {
            [AutoSubscriberConsumer(SubscriptionId = "OrderService")]
            public void Consume(Order message)
            {
                //业务代码
            }
        }
    复制代码

              也可以使用异步方法:  

    复制代码
        public class OrderConsumer : IConsumeAsync<Order>
        {
            [AutoSubscriberConsumer(SubscriptionId = "OrderService")]
            public Task ConsumeAsync(Order message)
            {
                //业务代码
            }
        }
    复制代码

             利用EasyNetQ的自动订阅者进行订阅,这里扩展IApplicationBuilder:

    复制代码
    public static class ApplicationExtenssion
    {
            public static IApplicationBuilder UseSubscribe(this IApplicationBuilder appBuilder, string subscriptionIdPrefix, Assembly assembly)
            {
                var services = appBuilder.ApplicationServices.CreateScope().ServiceProvider;
    
            </span><span style="color: #0000ff;">var</span> lifeTime = services.GetService&lt;IApplicationLifetime&gt;<span style="color: #000000;">();
            </span><span style="color: #0000ff;">var</span> bus = services.GetService&lt;IBus&gt;<span style="color: #000000;">();
            lifeTime.ApplicationStarted.Register(() </span>=&gt;<span style="color: #000000;">
            {
                </span><span style="color: #0000ff;">var</span> subscriber = <span style="color: #0000ff;">new</span><span style="color: #000000;"> AutoSubscriber(bus, subscriptionIdPrefix);
                subscriber.Subscribe(assembly);         <br></span>                subscriber.SubscribeAsync(assembly);<br><span>             }); <br><br>        lifeTime.ApplicationStopped.Register(() </span>=&gt;<span> { bus.Dispose(); }); <br><br></span><span style="color: #0000ff;">        return</span><span> appBuilder; <br>     } <br>}</span></pre>
    
    复制代码

              使用扩展方法:

            public void Configure(IApplicationBuilder app, IHostingEnvironment env)
            {
                app.UseSubscribe("OrderService", Assembly.GetExecutingAssembly());
            }

  • 相关阅读:
    ios中的几种多线程实现
    在mac下使用终端管理svn
    关于UIScrollViewDelegate协议中每个回调函数的意义及执行顺序的理解
    UIView 及其子类对象 抖动效果的实现
    ios、andriod、cocos2d 视图层次理解
    委托  通知中心   监听/观察
    iphone 中使用苹果禁用的私有Framework
    关于苹果官方网站Reachability检测网络的总结
    iOS设备的分辨率
    ios开发多线程、网络请求的理解 错误码的理解
  • 原文地址:https://www.cnblogs.com/owenzh/p/11451158.html
Copyright © 2011-2022 走看看