zoukankan      html  css  js  c++  java
  • .NET Core微服务之基于EasyNetQ使用RabbitMQ消息队列

    Tip: 此篇已加入.NET Core微服务基础系列文章索引

    一、消息队列与RabbitMQ

    1.1 消息队列

      “消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息被发送到队列中,“消息队列”是在消息的传输过程中保存消息的容器

      消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为:

    当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候。  

      消息队列主要解决了应用耦合、异步处理、流量削锋等问题。当前使用较多的消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,而部分数据库如Redis、Mysql以及phxsql也可实现消息队列的功能。更多详细内容请参考:《消息队列及其应用场景介绍

      我也在前几年写过一篇基于Redis做消息队列的文章,对消息队列的一个应用场景做了介绍,没有了解过的童鞋可以看看。

    1.2 RabbitMQ

      

      RabbitMQ是一款基于AMQP(高级消息队列协议),由Erlang开发的开源消息队列组件。是一款优秀的消息队列组件,他由两部分组成:服务端和客户端,客户端支持多种语言的驱动,如:.Net、JAVA、Erlang等。

      网上有很多性能比较的文章,例如在1百万条1k的消息下,每秒种的收发情况如下图所示:

      性能比较

      这里不过多介绍RabbitMQ,有关RabbitMQ的一些需要了解的概念你可以通过下面的文章了解:

      颜圣杰,《RabbitMQ知多少

      如果你想了解RabbitMQ与Kafka的对比,可以阅读这篇文章:《开源软件成熟度评测报告-分布式消息中间件

      而EasyNetQ呢,它是一款基于RabbitMQ.Client封装的API库,正如其名,使用起来比较Easy,它把原RabbitMQ.Client中的很多操作都进行了再次封装,让开发人员减少了很多工作量。

    二、RabbitMQ的安装

    2.1 Linux下的安装

      这里不演示如何在Linux下安装,但推荐生产环境使用Linux,下面是一些参考资料:

      mcgrady,《Linux下RabbitMQ的安装

      晓晨Master,《.NET Core使用RabbitMQ

      牛头人,《Linux安装RabbitMQ

      一只猪儿虫,《RabbitMQ Linux安装

    2.2 Windows下的安装

      开发环境下,我一般使用Windows Server虚拟机,所以这里说明下如何在Windows下安装:

      (1)下载ErlangRabbitMQ (这里我选则的并非最新版本,而是etp20.3和rabbitmq3.7.5)

      

      (2)首先安装Erlang,然后添加环境变量(如果添加了,则skip这一步)并加到PATH中

      

      (3)其次安装RabbitMQ,一路Next,安装完成后也为其添加环境变量并添加到PATH中

      

      

      (4)检查是否安装成功:rabbitmqctl status

      这里我碰到了如下的错误:

      

      解决方法:

      更正erlang.cookie文件,详情请参考:https://blog.csdn.net/u012637358/article/details/80078610

      最终状态:

      

      检查Windows服务,发现已经自动注册了一个服务:

      

      (5)激活Web管理插件,然后检查是否可见(http://127.0.0.1:15672)

      

      

    2.3 一些必要的配置

      (1)使用默认账号:guest/guest登录进去,添加一个新用户(Administrator权限),并设置其Permission

      

      (2)添加新的虚拟机(默认为/,这里我添加一个名为EDCVHOST的虚拟机)

      

      (3)绑定新添加的用户到新的虚拟机上,接下来在我们的程序中就主要使用admin这个用户和EDCVHOST这个虚拟机

      

      *.当然,为了安全考虑,你也可以把guest用户remove掉

    三、Quick Start:第一个消息队列

    3.1 项目准备

      这里为了快速的演示如何使用EasyNetQ,我们来一个QuickStart,准备三个项目:两个Console程序和一个Class Library。

      

      其中,对Publisher和Subscriber项目安装EasyNetQ:

    NuGet>Install-Package EasyNetQ  

      针对Messages类库,新增一个class如下:

        public class TextMessage
        {
            public string Text { get; set; }
        }

    3.2 我是Publisher

      添加以下代码:

    复制代码
        public class Program
        {
            public static void Main(string[] args)
            {
                var connStr = "host=192.168.80.71;virtualHost=EDCVHOST;username=admin;password=edison";
    
                using (var bus = RabbitHutch.CreateBus(connStr))
                {
                    var input = "";
                    Console.WriteLine("Please enter a message. 'Quit' to quit.");
                    while ((input = Console.ReadLine()) != "Quit")
                    {
                        bus.Publish(new TextMessage
                        {
                            Text = input
                        });
                    }
                }
            }
        }
    复制代码

      可以看到,我们在其中使用EasyNetQ高度封装的接口创建了一个IBus接口的实例,通过这个IBus实例我们可以通过一个超级Easy的Publish接口进行发布消息。这里主要是读取用户在控制台中输入的消息字符串进行发送。实际中,发送的一般都是一个或多个复杂的实体对象。

    3.3 我是Subscriber

      添加如下所示代码:

    复制代码
        public class Program
        {
            public static void Main(string[] args)
            {
                var connStr = "host=192.168.80.71;virtualHost=EDCVHOST;username=admin;password=edison";
    
                using (var bus = RabbitHutch.CreateBus(connStr))
                {
                    bus.Subscribe<TextMessage>("my_test_subscriptionid", HandleTextMessage);
    
                    Console.WriteLine("Listening for messages. Hit <return> to quit.");
                    Console.ReadLine();
                }
            }
    
            public static void HandleTextMessage(TextMessage textMessage)
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine("Got message: {0}", textMessage.Text);
                Console.ResetColor();
            }
        }
    复制代码

      这里主要是通过IBus实例去订阅消息(这里是除非用户关闭程序否则一直处于监听状态),当发布者发布了指定类型的消息之后,这里就把它打印出来(红色字体显示)。

    3.4 简单测试 

      通过控制台信息查看结果:

      

      通过RabbitMQ管理界面查看:

      (1)通过Connections Tab可以发现我们的两个客户端都在Running中

      

      (2)通过Queues Tab查看目前已有的队列=>可以看到目前我们只注册了一个队列

      

    四、在ASP.NET Core中的使用

    4.1 案例结构与说明

      这里假设有这样一个场景,客户通过浏览器提交了一个保单,这个保单中包含一些客户信息,ClientService将这些信息处理后发送一个消息到RabbitMQ中,NoticeService和ZAPEngineService订阅了这个消息。NoticeService会将客户信息取出来并获取一些更多信息为客户发送Email,而ZAPEngineService则会根据客户的一些关键信息(比如:年龄,是否吸烟,学历,年收入等等)去数据库读取一些规则来生成一份Question List并存入数据库。

    4.2 项目准备工作

      创建上面提到的这几个项目,这里我选择ASP.NET Core WebAPI类型。

      分别为这几个项目通过NuGet安装EasyNetQ组件,并且通过以下代码注入统一的IBus实例对象:

    复制代码
        public IServiceProvider ConfigureServices(IServiceCollection services)
        {
          // IoC - EventBus
          services.AddSingleton(RabbitHutch.CreateBus(Configuration["MQ:Dev"]));
          ......
        }
    复制代码

      这里我将连接字符串写到了配置文件中,请参考上面的QuickStart中的内容。

      下面是这个demo用到的一个消息对象实体:通过标签声明队列名称。

    复制代码
        [Queue("Qka.Client", ExchangeName = "Qka.Client")]
        public class ClientMessage
        {
            public int ClientId { get; set; }
            public string ClientName { get; set; }
            public string Sex { get; set; }
            public int Age { get; set; }
            // N: Non-Smoker, S: Smoker
            public string SmokerCode { get; set; }
            // Bachelor, Master, Doctor
            public string Education { get; set; }
            public decimal YearIncome { get; set; }
        }
    复制代码

      此外,为了充分简化代码量,EasyNetQ提供了一个AutoSubscriber的方式,可以通过接口和标签快速地让一个类成为Consumer。详细内容参考:https://github.com/EasyNetQ/EasyNetQ/wiki/Auto-Subscriber

      这里为了快速的在项目中使用Subscriber,添加一个扩展方法,它会从注入的服务中取出IBus实例对象,并自动帮我们进行Subscriber(那些实现了IConsume接口的类)的注册。具体用法见后面的介绍。

    复制代码
        public static class AppBuilderExtension
        {
            public static IApplicationBuilder UseSubscribe(this IApplicationBuilder appBuilder, string subscriptionIdPrefix, Assembly assembly)
            {
                var services = appBuilder.ApplicationServices.CreateScope().ServiceProvider;
    
                var lifeTime = services.GetService<IApplicationLifetime>();
                var bus = services.GetService<IBus>();
                lifeTime.ApplicationStarted.Register(() =>
                {
                    var subscriber = new AutoSubscriber(bus, subscriptionIdPrefix);
                    subscriber.Subscribe(assembly);
                    subscriber.SubscribeAsync(assembly);
                });
    
                lifeTime.ApplicationStopped.Register(() => bus.Dispose());
    
                return appBuilder;
            }
        }
    复制代码

    4.3 Publisher:ClientService

      ClientService作为发布者,这里假设我们在API中处理完业务代码后,将message发布给RabbitMQ:

    复制代码
        [Produces("application/json")]
        [Route("api/Client")]
        public class ClientController : Controller
        {
            private readonly IClientService clientService;
            private readonly IBus bus;
    
            public ClientController(IClientService _clientService, IBus _bus)
            {
                clientService = _clientService;
                bus = _bus;
            }
    
            ......
    
            [HttpPost]
            public async Task<string> Post([FromBody]ClientDTO clientDto)
            {
                // Business Logic here...
                // eg.Add new client to your service databases via EF
                // Sample Publish
                ClientMessage message = new ClientMessage
                {
                    ClientId = clientDto.Id.Value,
                    ClientName = clientDto.Name,
                    Sex = clientDto.Sex,
                    Age = 29,
                    SmokerCode = "N",
                    Education = "Master",
                    YearIncome = 100000
                };
                await bus.PublishAsync(message);
    
                return "Add Client Success! You will receive some letter later.";
            }
        }
    复制代码

      当然,你可以使用同步方法:bus.Publish(message);

    4.4 Subscriber: NoticeService & ZAPEngineService

      (1)NoticeService:新增一个实现IConsume接口的Consumer类

    复制代码
        public class ClientMessageConsumer: IConsumeAsync<ClientMessage>
        {
            [AutoSubscriberConsumer(SubscriptionId = "ClientMessageService.Notice")]
            public Task ConsumeAsync(ClientMessage message)
            {
                // Your business logic code here
                // eg.Build one email to client via SMTP service
                // Sample console code
                System.Console.ForegroundColor = System.ConsoleColor.Red;
                System.Console.WriteLine("Consume one message from RabbitMQ : {0}, I will send one email to client.", message.ClientName);
                System.Console.ResetColor();
    
                return Task.CompletedTask;
            }
        }
    复制代码

      这里为了演示效果,增加了一些输出信息的代码,下面的ZAPEngineService也是一样,不再赘述。

      (2)ZAPEngineService:新增一个实现IConsume接口的Consumer类

    复制代码
        public class ClientMessageConsumer : IConsumeAsync<ClientMessage>
        {
            [AutoSubscriberConsumer(SubscriptionId = "ClientMessageService.ZapQuestion")]
            public Task ConsumeAsync(ClientMessage message)
            {
                // Your business logic code here
                // eg.Generate one ZAP question records into database and send to client
                // Sample console code
                System.Console.ForegroundColor = System.ConsoleColor.Red;
                System.Console.WriteLine("Consume one message from RabbitMQ : {0}, I will generate one ZAP question list to client", message.ClientName);
                System.Console.ResetColor();
    
                return Task.CompletedTask;
            }
        }
    复制代码

      注意两个Consumer的SubscriptionId不能一样,否则无法接受到消息。

      (3)为两个Consumer使用扩展方法:UseSubscribe

    复制代码
        public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
        {
            ......
    
            // easyNetQ
            app.UseSubscribe("ClientMessageService", Assembly.GetExecutingAssembly());
        }
    复制代码

    4.5 简单测试

      (1)借助Postman向ClientService发起Post请求

      (2)查看NoticeService的日志信息

      (3)查看ZAPEngineService的日志信息

      (4)查看RabbitMQ的管理控制台:

    五、小结

      本篇超级简单地介绍了一下消息队列与RabbitMQ,通过使用EasyNetQ这个基于RabbitMQ.Client的客户端做了一个QuickStart演示了在.NET Core环境下如何进行消息的发布与订阅,并通过一个微服务的小案例演示了如何在ASP.NET Core环境下如何基于EasyNetQ完成消息的发布与订阅,看起来就像一个类似于简单的事件总线。当然,本篇的内容都十分基础,如果要应用好RabbitMQ,还得把那些基础概念(如:Channel,Exchange等)弄清楚,然后去理解一下事件总线的概念,实际中还得考虑数据一致性等等,路途漫漫,继续加油吧!

    示例代码

      Click Here => 点我下载

    参考资料

    EasyNetQ官方文档:https://github.com/EasyNetQ/EasyNetQ/wiki/Introduction

    focus-lei,《.net core使用EasyNetQ做EventBus

    常山造纸农,《RabbitMQ安装配置和基于EasyNetQ驱动的基础使用

  • 相关阅读:
    有点忙啊
    什么是协程
    HDU 1110 Equipment Box (判断一个大矩形里面能不能放小矩形)
    HDU 1155 Bungee Jumping(物理题,动能公式,弹性势能公式,重力势能公式)
    HDU 1210 Eddy's 洗牌问题(找规律,数学)
    HDU1214 圆桌会议(找规律,数学)
    HDU1215 七夕节(模拟 数学)
    HDU 1216 Assistance Required(暴力打表)
    HDU 1220 Cube(数学,找规律)
    HDU 1221 Rectangle and Circle(判断圆和矩形是不是相交)
  • 原文地址:https://www.cnblogs.com/webenh/p/11695947.html
Copyright © 2011-2022 走看看