zoukankan      html  css  js  c++  java
  • NETCORE

    NETCORE - RabbitMQ的使用

     NET Core微服务之基于EasyNetQ使用RabbitMQ消息队列

     

    安装部署RabbbitMQ:https://www.cnblogs.com/1285026182YUAN/p/12896851.html 

    NETCORE中的订阅模式

    一、控制台项目

    创建三个项目:

    • NETCORE.RabbitMQ.Publisher:Console项目
    • NETCORE.RabbitMQ.Subscriber1:Console项目
    • NETCORE.RabbitMQ.Messages:类库

     Publisher 与 Subscriber1 项目安装 EasyNetQ 插件

    NuGet>Install-Package EasyNetQ  

    Messages 项目,新增一个类

        public class TextMessage
        {
            public string Text { get; set; }
        }

    Publisher项目,添加以下代码

    using EasyNetQ;
    using NETCORE.RabbitMQ.Messages;
    using System;
    
    namespace NETCORE.RabbitMQ.Publisher
    {
        class Program
        {
            static void Main(string[] args)
            {
                var connStr = "host=192.168.122.199:5672;virtualHost=vhost_lihy;username=admin;password=123456";
    
                using (var bus = RabbitHutch.CreateBus(connStr))
                {
                    var input = "";
    
                    Console.WriteLine("I'm Publisher");
                    Console.WriteLine("Please enter a message. 'Quit' to quit.");
    
                    while ((input = Console.ReadLine()) != "Quit")
                    {
                        bus.Publish(new TextMessage
                        {
                            Text = input
                        });
                    }
                }
     
    
                Console.WriteLine("Hello World!");
            }
        }
    }
    View Code

    可以看到,我们在其中使用EasyNetQ高度封装的接口创建了一个IBus接口的实例,通过这个IBus实例我们可以通过一个超级Easy的Publish接口进行发布消息。这里主要是读取用户在控制台中输入的消息字符串进行发送。实际中,发送的一般都是一个或多个复杂的实体对象。

    Subscriber项目,添加以下代码

    using System;
    using EasyNetQ;
    using NETCORE.RabbitMQ.Messages;
    
    namespace NETCORE.RabbitMQ.Subscriber1
    {
        class Program
        {
            static void Main(string[] args)
            {
                var connStr = "host=192.168.122.199:5672;virtualHost=vhost_lihy;username=admin;password=123456";
    
                using (var bus = RabbitHutch.CreateBus(connStr))
                {
                    bus.Subscribe<TextMessage>("my_test_subscriptionid", HandleTextMessage);
    
    
                    Console.WriteLine("I'm Subscriber1");
                    Console.WriteLine("Listening for messages. Hit <return> to quit.");
                    Console.ReadLine();
                }
                 
                Console.WriteLine("Hello World!");
            }
    
            public static void HandleTextMessage(TextMessage textMessage)
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine("Got message: {0}", textMessage.Text);
                Console.ResetColor();
            }
        }
    }
    View Code

    这里主要是通过IBus实例去订阅消息(这里是除非用户关闭程序否则一直处于监听状态),当发布者发布了指定类型的消息之后,这里就把它打印出来(红色字体显示)。

    测试:

    通过控制台信息查看结果:

     

     

     

    二、 WebApi项目

      创建上面提到的这几个项目,这里我选择ASP.NET Core WebAPI类型。

      分别为这三个项目通过NuGet安装EasyNetQ组件

    PM> Install-Package EasyNetQ

    在messages 项目中 下面是这个demo用到的一个消息对象实体:通过标签声明队列名称。

    using System;
    using System.Collections.Generic;
    using System.Text;
    using EasyNetQ;
    
    namespace NETCORE.RabbitMQ.Messages
    {
    
        [Queue("Qka.Client", ExchangeName = "Qka.Client")]
        public class ClientMessage
        {
            public int ClientId { get; set; }
            public string ClientName { get; set; }
            public string Sex { get; set; }
            public int Age { get; set; }
            // N: Non-Smoker, S: Smoker
            public string SmokerCode { get; set; }
            // Bachelor, Master, Doctor
            public string Education { get; set; }
            public decimal YearIncome { get; set; }
        }
    }

     此外,为了充分简化代码量,EasyNetQ提供了一个AutoSubscriber的方式,可以通过接口和标签快速地让一个类成为Consumer。详细内容参考:https://github.com/EasyNetQ/EasyNetQ/wiki/Auto-Subscriber

      这里为了快速的在项目中使用Subscriber,添加一个扩展方法,它会从注入的服务中取出IBus实例对象,并自动帮我们进行Subscriber(那些实现了IConsume接口的类)的注册。具体用法见后面的介绍。

    Publisher项目:ClientService

      ClientService作为发布者,这里假设我们在API中处理完业务代码后,将message发布给RabbitMQ:

    appsettings.json 配置文件

    {
      "Logging": {
        "LogLevel": {
          "Default": "Warning"
        }
      },
      "ConnectionStrings": {
        "RabbitMQ_Conn": "host=192.168.122.199:5672;virtualHost=vhost_lihy;username=admin;password=123456"
      },
      "AllowedHosts": "*"
    }

    Startup.cs 文件,通过以下代码注入统一的IBus实例对象:

            public void ConfigureServices(IServiceCollection services)
            {
                services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
    
                services.AddSingleton(RabbitHutch.CreateBus(Configuration.GetConnectionString("RabbitMQ_Conn")));
            }

    创建控制器 ClientController 

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading.Tasks;
    using EasyNetQ;
    using Microsoft.AspNetCore.Mvc;
    using NETCORE.RabbitMQ.Messages;
    
    namespace NETCORE.RabbitMQ.API.Publisher.Controllers
    {
        [Produces("application/json")]
        [Route("api/Client")]
        public class ClientController : Controller
        { 
            private readonly IBus bus;
    
            public ClientController(  IBus _bus)
            { 
                bus = _bus;
            }
             
            [Route("GetVs")]
            [HttpGet]
            public async Task<string> GetVs()
            {
                // Business Logic here...
                // eg.Add new client to your service databases via EF
                // Sample Publish
                ClientMessage message = new ClientMessage
                {
                    ClientId = 12,
                    ClientName = "lihongyuan",
                    Sex ="sss",
                    Age = 29,
                    SmokerCode = "N",
                    Education = "Master",
                    YearIncome = 100000
                };
                await bus.PublishAsync(message);
    
                return "Add Client Success! You will receive some letter later.";
            }
        }
    }
    View Code

    当然,你可以使用同步方法:bus.Publish(message);

     

    Subscriber项目:

      订阅者接收消息。

    appsettings.json 配置文件

    {
      "Logging": {
        "LogLevel": {
          "Default": "Warning"
        }
      },
      "ConnectionStrings": {
        "RabbitMQ_Conn": "host=192.168.122.199:5672;virtualHost=vhost_lihy;username=admin;password=123456"
      },
      "AllowedHosts": "*"
    }

    Startup.cs 文件,通过以下代码注入统一的IBus实例对象:

            public void ConfigureServices(IServiceCollection services)
            {
                services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
    
                services.AddSingleton(RabbitHutch.CreateBus(Configuration.GetConnectionString("RabbitMQ_Conn")));
            }

    创建 AppBuilderExtension.cs 类

    using EasyNetQ;
    using EasyNetQ.AutoSubscribe;
    using Microsoft.AspNetCore.Builder;
    using Microsoft.AspNetCore.Hosting;
    using Microsoft.Extensions.DependencyInjection;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Reflection;
    using System.Threading.Tasks;
    
    namespace NETCORE.RabbitMQ.API.Subscriber
    {
        public static class AppBuilderExtension
        {
            public static IApplicationBuilder UseSubscribe(this IApplicationBuilder appBuilder, string subscriptionIdPrefix, Assembly assembly)
            {
                var services = appBuilder.ApplicationServices.CreateScope().ServiceProvider;
    
                var lifeTime = services.GetService<IApplicationLifetime>();
                var bus = services.GetService<IBus>();
                lifeTime.ApplicationStarted.Register(() =>
                {
                    var subscriber = new AutoSubscriber(bus, subscriptionIdPrefix);
                    subscriber.Subscribe(assembly);
                    subscriber.SubscribeAsync(assembly);
                });
    
                lifeTime.ApplicationStopped.Register(() => bus.Dispose());
    
                return appBuilder;
            }
        }
    }
    View Code

     startup.css 文件中 

    public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
        {
            ......
    
            // easyNetQ
            app.UseSubscribe("ClientMessageService", Assembly.GetExecutingAssembly());
        }

    增加订阅类:

    using EasyNetQ.AutoSubscribe;
    using NETCORE.RabbitMQ.Messages;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading.Tasks;
    
    namespace NETCORE.RabbitMQ.API.Subscriber.Consumer
    {
        public class ClientMessageConsumer : IConsumeAsync<ClientMessage>
        {
            [AutoSubscriberConsumer(SubscriptionId = "ClientMessageService.ZapQuestion")]
            public Task ConsumeAsync(ClientMessage message)
            {
                // Your business logic code here
                // eg.Generate one ZAP question records into database and send to client
                // Sample console code
                System.Console.ForegroundColor = System.ConsoleColor.Red;
                System.Console.WriteLine("Consume one message from RabbitMQ : {0}, I will generate one ZAP question list to client", message.ClientName);
                System.Console.ResetColor();
    
                return Task.CompletedTask;
            }
        }
    }
    View Code

    启动后,可查看效果

    引用:https://www.cnblogs.com/edisonchou/p/aspnetcore_easynetq_basicdemo_foundation.html

  • 相关阅读:
    SQL Server 灾难恢复31天之第3天:在还原数据库时确定需要哪些备份文件
    发布订阅延迟故障排查案例:分发读进程延迟
    [分享]系统crash后SQL Server 在recovery时的rollback机制
    统计信息对执行计划的影响
    [荐][转]SQL SERVER SQLOS的任务调度
    SQL Server 灾难恢复31天之第1天:DBCC CHECK命令会自动使用已经存在的数据库快照吗?
    日志文件如何影响我数据库的启动
    SQL Server 2012 正式发布
    SQL Server 灾难恢复31天之第6天:管理区分配页损坏处理
    [转]Working Set和Private Bytes
  • 原文地址:https://www.cnblogs.com/1285026182YUAN/p/12896843.html
Copyright © 2011-2022 走看看