zoukankan      html  css  js  c++  java
  • .net core使用EasyNetQ做EventBus

    转自 https://www.cnblogs.com/focus-lei/p/9121095.html

    还没有亲测,有时间反馈测试结果

          随着SOA、微服务、CQRS的盛行,EventBus越来越流行,上GitHub搜了一下,还是有蛮多的这类实现,老牌的有NServiceBus(收费)、MassTransit,最近的有CAP(国人写的,1.4k个Star,非常不错)、ReBus(张队长在NanoFabric中推荐的)、RawRabbit等,今天我介绍的是另外一款产品:EasyNetQ,支持.net core,正如其名,用起来确实very easy。

           首先在asp.net core项目中添加EasyNetQ的nuget引用,注入:

    public void ConfigureServices(IServiceCollection services)
    {
           string rabbitMqConnection = Configuration["RabbitMqConnection"];
           services.AddSingleton(RabbitHutch.CreateBus(rabbitMqConnection));
    }

           1、发布消息:

    复制代码
    public class ValuesController : Controller
        {
            private IBus _bus;
            public ValuesController(IBus bus)
            {
                _bus = bus;
            }
    
    
            // POST api/values
            [HttpPost]
            public async Task Post([FromBody]Order message)
            {
                await _bus.PublishAsync(message);
            }
        }
    复制代码

              可自定义消息的Exchange和Queue,需要修改消息实体:

        [Queue("Qka.Order", ExchangeName = "Qka.Order")]
        public class Order
        {
            public int OrderId { get; set; }
        }

              2、订阅消息

       我们订阅的服务也是寄宿在asp.net core程序中,首先定义消费者:

    复制代码
        public class OrderConsumer : IConsume<Order>
        {
            [AutoSubscriberConsumer(SubscriptionId = "OrderService")]
            public void Consume(Order message)
            {
                //业务代码
            }
        }
    复制代码

              也可以使用异步方法:  

    复制代码
        public class OrderConsumer : IConsumeAsync<Order>
        {
            [AutoSubscriberConsumer(SubscriptionId = "OrderService")]
            public Task ConsumeAsync(Order message)
            {
                //业务代码
            }
        }
    复制代码

             利用EasyNetQ的自动订阅者进行订阅,这里扩展IApplicationBuilder:

    复制代码
    public static class ApplicationExtenssion
    {
            public static IApplicationBuilder UseSubscribe(this IApplicationBuilder appBuilder, string subscriptionIdPrefix, Assembly assembly)
            {
                var services = appBuilder.ApplicationServices.CreateScope().ServiceProvider;
    
                var lifeTime = services.GetService<IApplicationLifetime>();
                var bus = services.GetService<IBus>();
                lifeTime.ApplicationStarted.Register(() =>
                {
                    var subscriber = new AutoSubscriber(bus, subscriptionIdPrefix);
                    subscriber.Subscribe(assembly);         
    subscriber.SubscribeAsync(assembly);
    });

            lifeTime.ApplicationStopped.Register(() => { bus.Dispose(); });

            return appBuilder;
         }
    }
    复制代码

              使用扩展方法:

            public void Configure(IApplicationBuilder app, IHostingEnvironment env)
            {
                app.UseSubscribe("OrderService", Assembly.GetExecutingAssembly());
            }

  • 相关阅读:
    298. Binary Tree Longest Consecutive Sequence
    117. Populating Next Right Pointers in Each Node II
    116. Populating Next Right Pointers in Each Node
    163. Missing Ranges
    336. Palindrome Pairs
    727. Minimum Window Subsequence
    211. Add and Search Word
    年底购物狂欢,移动支付安全不容忽视
    成为程序员前需要做的10件事
    全球首推iOS应用防破解技术!
  • 原文地址:https://www.cnblogs.com/jzz228/p/11276893.html
Copyright © 2011-2022 走看看