zoukankan      html  css  js  c++  java
  • RabbitMQ学习ASP.NET Core中使用RabbitMQ

    代码参考连接:https://www.bilibili.com/video/BV1GU4y1w7Yq?p=9&spm_id_from=pageDriver

    1.项目结构

    项目类型为ASP.NET Core Web API

     

     

       

       

    2.Infrastructure类库代码 

    nuget添加RabbitMQ.Client的引用

     (1) RabbitConnection.cs代码

     1 using RabbitMQ.Client;
     2 using System;
     3 using System.Collections.Generic;
     4 using System.Text;
     5 
     6 namespace Infrastructure.Config
     7 {
     8     public class RabbitConnection
     9     {
    10         private readonly RabbitOption _config;
    11         private IConnection _connection = null;
    12         public RabbitConnection(RabbitOption rabbitOption)
    13         {
    14             _config = rabbitOption;
    15         }
    16 
    17         public IConnection GetConnection()
    18         {
    19             if (_connection == null)
    20             {
    21                 if (string.IsNullOrEmpty(_config.Address))
    22                 {
    23                     ConnectionFactory factory = new ConnectionFactory();
    24                     factory.HostName = _config.HostName;
    25                     factory.Port = _config.Port;
    26                     factory.UserName = _config.UserName;
    27                     factory.Password = _config.Password;
    28                     factory.VirtualHost = _config.VirtualHost;
    29                     _connection = factory.CreateConnection();
    30                 }
    31                 else
    32                 {
    33                     ConnectionFactory factory = new ConnectionFactory();
    34                     factory.UserName = _config.UserName;
    35                     factory.Password = _config.Password;
    36                     factory.VirtualHost = _config.VirtualHost;
    37 
    38                     var address = _config.Address;
    39                     List<AmqpTcpEndpoint> endpoints = new List<AmqpTcpEndpoint>();
    40                     foreach (var endpoint in address.Split(","))
    41                     {
    42                         endpoints.Add(new AmqpTcpEndpoint(endpoint.Split(":")[0],int.Parse(endpoint.Split(":")[1])));
    43                     }
    44                     _connection = factory.CreateConnection(endpoints);
    45                 }
    46             }
    47             return _connection;
    48         }
    49     }
    50 }

    (2)RabbitConstant.cs代码

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Text;
     4 
     5 namespace Infrastructure.Config
     6 {
     7     public class RabbitConstant
     8     {
     9         public const string TEST_EXCHANGE = "test.exchange";
    10         public const string TEST_QUEUE = "test.queue";
    11     }
    12 }

    (3)RabbitOption.cs代码

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Text;
     4 
     5 namespace Infrastructure.Config
     6 {
     7     public class RabbitOption
     8     {
     9         /// <summary>
    10         /// 主机名称
    11         /// </summary>
    12         public string HostName { get; set; }
    13         public string Address { get; set; }
    14         /// <summary>
    15         /// 端口号
    16         /// </summary>
    17         public int Port { get; set; }
    18         public string UserName { get; set; }
    19         public string Password { get; set; }
    20 
    21         public string VirtualHost { get; set; }
    22     }
    23 }

    (4) QueueInfo.cs

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Text;
     4 
     5 namespace Infrastructure.Consumer
     6 {
     7     public class QueueInfo
     8     {
     9         /// <summary>
    10         /// 队列名称
    11         /// </summary>
    12         public string Queue { get; set; }
    13         /// <summary>
    14         /// 路由名称
    15         /// </summary>
    16         public string RoutingKey { get; set; }
    17         /// <summary>
    18         /// 交换机类型
    19         /// </summary>
    20         public string ExchangeType { get; set; }
    21         /// <summary>
    22         /// 交换机名称
    23         /// </summary>
    24         public string Exchange { get; set; }
    25         public IDictionary<string, object> props { get; set; } = null;
    26         public Action<RabbitMessageEntity> OnRecevied { get; set; }
    27     }
    28 }

    (5)RabbitChannelConfig.cs

     1 using RabbitMQ.Client;
     2 using RabbitMQ.Client.Events;
     3 using System;
     4 using System.Collections.Generic;
     5 using System.Text;
     6 
     7 namespace Infrastructure.Consumer
     8 {
     9     public class RabbitChannelConfig
    10     {
    11         public string ExchangeTypeName { get; set; }
    12         public string ExchangeName { get; set; }
    13         public string QueueName { get; set; }
    14         public string RoutingKeyName { get; set; }
    15         public IConnection Connection { get; set; }
    16         public EventingBasicConsumer Consumer { get; set; }
    17 
    18         /// <summary>
    19         /// 外部订阅消费者通知委托
    20         /// </summary>
    21         public Action<RabbitMessageEntity> OnReceivedCallback { get; set; }
    22 
    23         public RabbitChannelConfig(string exchangeType,string exchange,string queue,string routingKey)
    24         {
    25             this.ExchangeTypeName = exchangeType;
    26             this.ExchangeName = exchange;
    27             this.QueueName = queue;
    28             this.RoutingKeyName = routingKey;
    29         }
    30 
    31         public void Receive(object sender,BasicDeliverEventArgs args)
    32         {
    33             RabbitMessageEntity body = new RabbitMessageEntity();
    34             try
    35             {
    36                 string content = Encoding.UTF8.GetString(args.Body.ToArray());
    37                 body.Content = content;
    38                 body.Consumer =(EventingBasicConsumer)sender;
    39                 body.BasicDeliver = args;
    40             }
    41             catch (Exception e)
    42             {
    43                 body.ErrorMessage = $"订阅出错{e.Message}";
    44                 body.Exception = e;
    45                 body.Error = true;
    46                 body.Code = 500;
    47             }
    48             OnReceivedCallback?.Invoke(body);
    49         }
    50     }
    51 }

    (6)RabbitChannelManager.cs

     1 using Infrastructure.Config;
     2 using RabbitMQ.Client;
     3 using RabbitMQ.Client.Events;
     4 using System;
     5 using System.Collections.Generic;
     6 using System.Text;
     7 
     8 namespace Infrastructure.Consumer
     9 {
    10     public class RabbitChannelManager
    11     {
    12         public RabbitConnection Connection { get; set; }
    13 
    14         public RabbitChannelManager(RabbitConnection connection)
    15         {
    16             this.Connection = connection;
    17         }
    18 
    19         /// <summary>
    20         /// 创建接收消息的通道
    21         /// </summary>
    22         /// <param name="exchangeType"></param>
    23         /// <param name="exchange"></param>
    24         /// <param name="queue"></param>
    25         /// <param name="routingKey"></param>
    26         /// <param name="arguments"></param>
    27         /// <returns></returns>
    28         public RabbitChannelConfig CreateReceiveChannel(string exchangeType,string exchange,string queue,string routingKey,
    29             IDictionary<string,object>arguments = null)
    30         {
    31             IModel model = this.CreateModel(exchangeType,exchange,queue,routingKey,arguments);
    32             EventingBasicConsumer consumer = this.CreateConsumer(model,queue);
    33             RabbitChannelConfig channel = new RabbitChannelConfig(exchangeType,exchange,queue,routingKey);
    34             consumer.Received += channel.Receive;
    35             return channel;
    36         }
    37 
    38       
    39         /// <summary>
    40         /// 创建一个通道,包含交换机/队列/路由,并建立绑定关系
    41         /// </summary>
    42         /// <param name="exchangeType">交换机类型:Topic,Direct,Fanout</param>
    43         /// <param name="exchange">交换机名称</param>
    44         /// <param name="queue">队列名称</param>
    45         /// <param name="routingKey">路由名称</param>
    46         /// <param name="arguments"></param>
    47         /// <returns></returns>
    48         private IModel CreateModel(string exchangeType, string exchange, string queue, string routingKey, IDictionary<string, object> arguments)
    49         {
    50             exchangeType = string.IsNullOrEmpty(exchangeType) ? "default" : exchangeType;
    51             IModel model = this.Connection.GetConnection().CreateModel();
    52             model.BasicQos(0,1,false);
    53             model.QueueDeclare(queue,true,false,false,arguments);
    54             model.ExchangeDeclare(exchange, exchangeType);
    55             model.QueueBind(queue, exchange, routingKey);
    56             return model;
    57         }
    58 
    59         /// <summary>
    60         /// 创建消费者
    61         /// </summary>
    62         /// <param name="model"></param>
    63         /// <param name="queue"></param>
    64         /// <returns></returns>
    65         private EventingBasicConsumer CreateConsumer(IModel model, string queue)
    66         {
    67             EventingBasicConsumer consumer = new EventingBasicConsumer(model);
    68             model.BasicConsume(queue, false, consumer);
    69             return consumer;
    70         }
    71     }
    72 }

    (7)RabbitMessageEntity.cs

     1 using RabbitMQ.Client.Events;
     2 using System;
     3 using System.Collections.Generic;
     4 using System.Text;
     5 
     6 namespace Infrastructure.Consumer
     7 {
     8     public class RabbitMessageEntity
     9     {
    10         public string Content { get; set; }
    11         public EventingBasicConsumer Consumer { get; set; }
    12         public BasicDeliverEventArgs BasicDeliver { get; set; }
    13         public string ErrorMessage { get; set; }
    14         public Exception Exception { get; set; }
    15         public bool Error { get; set; }
    16         public int Code { get; set; }
    17     }
    18 }

    (8)IRabbitProducer.cs

     1 using System;
     2 using System.Collections.Generic;
     3 
     4 namespace Infrastructure.Producer
     5 {
     6     public interface IRabbitProducer
     7     {
     8         public void Publish(string exchange,string routingKey,IDictionary<string,object> props,string content);
     9     }
    10 }

    (9)RabbitProducer.cs

     1 using Infrastructure.Config;
     2 using System;
     3 using System.Collections.Generic;
     4 using System.Text;
     5 
     6 namespace Infrastructure.Producer
     7 {
     8     public class RabbitProducer : IRabbitProducer
     9     {
    10         private readonly RabbitConnection _connection;
    11         public RabbitProducer(RabbitConnection connection)
    12         {
    13             _connection = connection;
    14         }
    15         public void Publish(string exchange, string routingKey, IDictionary<string, object> props, string content)
    16         {
    17             var channel = _connection.GetConnection().CreateModel();
    18             var prop = channel.CreateBasicProperties();
    19             if (props.Count > 0)
    20             {
    21                 var delay = props["x-delay"];
    22                 prop.Expiration = delay.ToString();
    23             }
    24             channel.BasicPublish(exchange, routingKey, false, prop, Encoding.UTF8.GetBytes(content));
    25         }
    26     }
    27 }

    3.RabbitMQ.Services类库代码

    添加Infrastructure的项目引用

    (1)IOrderService.cs

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Text;
     4 
     5 namespace RabbitMQ.Services
     6 {
     7     public interface IOrderService
     8     {
     9         void SendOrderMessage();
    10         void SendTestMessage(string message);
    11     }
    12 }

    (2) OrderService.cs

     1 using Infrastructure.Config;
     2 using Infrastructure.Producer;
     3 using System;
     4 using System.Collections.Generic;
     5 using System.Text;
     6 
     7 namespace RabbitMQ.Services
     8 {
     9     public class OrderService : IOrderService
    10     {
    11         private readonly IRabbitProducer _rabbitProducer;
    12         public OrderService(IRabbitProducer rabbitProducer)
    13         {
    14             _rabbitProducer = rabbitProducer;
    15         }
    16         public void SendOrderMessage()
    17         {
    18             throw new NotImplementedException();
    19         }
    20 
    21         public void SendTestMessage(string message)
    22         {
    23             Console.WriteLine($"send message:{message}");
    24             _rabbitProducer.Publish(RabbitConstant.TEST_EXCHANGE,"",new Dictionary<string,object>(),message);
    25         }
    26     }
    27 }

    4. RabbitMQ.Test控制台代码

    添加RabbitMQ.WebApi.Order项目引用

    复制项目RabbitMQ.WebApi.Order下的appsettings.json到此项目中

    (1)ProcessTest.cs

     1 using Infrastructure.Config;
     2 using Infrastructure.Consumer;
     3 using Microsoft.Extensions.Hosting;
     4 using RabbitMQ.Client;
     5 using System;
     6 using System.Collections.Generic;
     7 using System.Text;
     8 using System.Threading;
     9 using System.Threading.Tasks;
    10 
    11 namespace RabbitMQ.Test
    12 {
    13     public class ProcessTest : IHostedService
    14     {
    15         private readonly RabbitConnection _connection;
    16         public List<QueueInfo> Queues { get; } = new List<QueueInfo>();
    17         public ProcessTest(RabbitConnection connection)
    18         {
    19             _connection = connection;
    20             Queues.Add(new QueueInfo()
    21             { 
    22                ExchangeType = ExchangeType.Fanout,
    23                Exchange = RabbitConstant.TEST_EXCHANGE,
    24                Queue = RabbitConstant.TEST_QUEUE,
    25                RoutingKey = "",
    26                OnRecevied = this.Receive
    27             });
    28         }
    29 
    30         private void Receive(RabbitMessageEntity message)
    31         {
    32             Console.WriteLine($"Test Receive Message:{message.Content}");
    33             message.Consumer.Model.BasicAck(message.BasicDeliver.DeliveryTag,true);
    34         }
    35 
    36         public Task StartAsync(CancellationToken cancellationToken)
    37         {
    38             Console.WriteLine("RabbitMQ测试消息接收处理服务正在启动");
    39             RabbitChannelManager channelManager = new RabbitChannelManager(_connection);
    40             foreach (var queueInfo in Queues)
    41             {
    42                 RabbitChannelConfig channel = channelManager.CreateReceiveChannel(queueInfo.ExchangeType,
    43                     queueInfo.Exchange,queueInfo.Queue,queueInfo.RoutingKey);
    44                 channel.OnReceivedCallback = queueInfo.OnRecevied;
    45             }
    46             Console.WriteLine("RabbitMQ测试消息接收处理服务已经启动");
    47             return Task.CompletedTask;
    48         }
    49 
    50         public Task StopAsync(CancellationToken cancellationToken)
    51         {
    52             return Task.CompletedTask;
    53         }
    54     }
    55 }

    (2) Program.cs

     1 using Infrastructure.Config;
     2 using Microsoft.Extensions.Configuration;
     3 using Microsoft.Extensions.DependencyInjection;
     4 using Microsoft.Extensions.Hosting;
     5 using System;
     6 
     7 namespace RabbitMQ.Test
     8 {
     9     class Program
    10     {
    11         static void Main(string[] args)
    12         {
    13             var section = new ConfigurationBuilder()
    14                 .AddJsonFile("appsettings.json")
    15                 .Build()
    16                 .GetSection("RabbitMQ");
    17             var host = new HostBuilder()
    18                 .ConfigureServices(services =>
    19                  services.AddSingleton(new RabbitConnection(section.Get<RabbitOption>()))
    20                          .AddSingleton<IHostedService,ProcessTest>())
    21                 .Build();
    22              host.Run();
    23         }
    24     }
    25 }

    5. RabbitMQ.WebApi.Order代码

    nuget添加Swashbuckle.AspNetCore引用

    添加项目Infrastructure、RabbitMQ.Services引用

    (1) OrderController.cs

     1 using Microsoft.AspNetCore.Http;
     2 using Microsoft.AspNetCore.Mvc;
     3 using RabbitMQ.Services;
     4 using System;
     5 using System.Collections.Generic;
     6 using System.Linq;
     7 using System.Threading.Tasks;
     8 
     9 namespace RabbitMQ.WebApi.Order.Controllers
    10 {
    11     [Route("api/[controller]")]
    12     [ApiController]
    13     public class OrderController : ControllerBase
    14     {
    15         private readonly IOrderService _orderService;
    16 
    17         public OrderController(IOrderService orderService)
    18         {
    19             _orderService = orderService;
    20         }
    21 
    22         //[HttpGet]
    23         //public IActionResult Order()
    24         //{
    25         //    _orderService.SendOrderMessage();
    26         //    return Ok();
    27         //}
    28         [HttpGet("test")]
    29         public IActionResult Test(string message)
    30         {
    31             _orderService.SendTestMessage(message);
    32             return Ok();
    33         }
    34 
    35     }
    36 }

    (2) ServiceExtensions.cs

     1 using Infrastructure.Config;
     2 using Microsoft.Extensions.Configuration;
     3 using Microsoft.Extensions.DependencyInjection;
     4 using System;
     5 using System.Collections.Generic;
     6 using System.Linq;
     7 using System.Threading.Tasks;
     8 
     9 namespace RabbitMQ.WebApi.Order.Extensions
    10 {
    11     public static class ServiceExtensions
    12     {
    13         public static void ConfigureCors(this IServiceCollection services)
    14         {
    15             services.AddCors(options=>
    16             {
    17                 options.AddPolicy("AnyPolicy",
    18                     builder=>builder.AllowAnyOrigin()
    19                                     .AllowAnyMethod()
    20                                     .AllowAnyHeader());
    21             });
    22         }
    23         public static void ConfigureRabbitContext(this IServiceCollection services,IConfiguration config)
    24         {
    25             var section = config.GetSection("RabbitMQ");
    26             services.AddSingleton(
    27                   new RabbitConnection(section.Get<RabbitOption>())); 
    28         }
    29     }
    30 }

    (3) Startup.cs

     1 using Microsoft.AspNetCore.Builder;
     2 using Microsoft.AspNetCore.Hosting;
     3 using Microsoft.AspNetCore.Mvc;
     4 using Microsoft.Extensions.Configuration;
     5 using Microsoft.Extensions.DependencyInjection;
     6 using Microsoft.Extensions.Hosting;
     7 using Microsoft.Extensions.Logging;
     8 using System;
     9 using System.Collections.Generic;
    10 using System.Linq;
    11 using System.Threading.Tasks;
    12 using RabbitMQ.WebApi.Order.Extensions;
    13 using Microsoft.OpenApi.Models;
    14 using RabbitMQ.Services;
    15 using Infrastructure.Producer;
    16 
    17 namespace RabbitMQ.WebApi.Order
    18 {
    19     public class Startup
    20     {
    21         public Startup(IConfiguration configuration)
    22         {
    23             Configuration = configuration;
    24         }
    25 
    26         public IConfiguration Configuration { get; }
    27 
    28         // This method gets called by the runtime. Use this method to add services to the container.
    29         public void ConfigureServices(IServiceCollection services)
    30         {
    31             services.AddControllers();
    32             services.AddSwaggerGen(c=>
    33             {
    34                 c.SwaggerDoc("v1",new OpenApiInfo { Title="RabbitMQ.WebApi.Order",Version="v1"});
    35             });
    36             services.AddScoped<IOrderService, OrderService>();
    37             services.AddScoped<IRabbitProducer,RabbitProducer>();
    38             services.ConfigureCors();
    39             services.ConfigureRabbitContext(Configuration);
    40         }
    41 
    42         // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
    43         public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    44         {
    45             if (env.IsDevelopment())
    46             {
    47                 app.UseDeveloperExceptionPage();
    48                 app.UseSwagger();
    49                 app.UseSwaggerUI(c=>c.SwaggerEndpoint("/swagger/v1/swagger.json","RabbitMQ.WebApi.Order v1"));
    50             }
    51 
    52             app.UseRouting();
    53 
    54             app.UseAuthorization();
    55 
    56             app.UseEndpoints(endpoints =>
    57             {
    58                 endpoints.MapControllers();
    59             });
    60         }
    61     }
    62 }

    (4) appsettings.json

    {
      "Logging": {
        "LogLevel": {
          "Default": "Information",
          "Microsoft": "Warning",
          "Microsoft.Hosting.Lifetime": "Information"
        }
      },
      "RabbitMQ": {
        "HostName": "127.0.0.1",
        "Address": "",
        "Port": 5672,
        "UserName": "guest",
        "Password": "guest",
        "VirtualHost": "/"
      },
    
      "AllowedHosts": "*"
    }

    6.运行

    (1)  powershell管理员模式运行RabbitMQ.Test项目

    PS D:\DotNetProject\RabbitMQ.WebApiDemo\RabbitMQ.Test> dotnet run

     (2)  powershell管理员模式运行RabbitMQ.WebApi.Order项目 

    PS D:\DotNetProject\RabbitMQ.WebApiDemo\RabbitMQ.WebApi.Order> dotnet run

     (3) 浏览器运行http://localhost:5000/swagger/index.html

     (4) 测试接口Test 

     (9)运行结果

     如需要项目源码,请留下邮箱,会将项目整个发送

    IRabbitProducer

  • 相关阅读:
    HDU 1075 What Are You Talking About(字典树)
    HDU 1075 What Are You Talking About (stl之map映射)
    HDU 1247 Hat’s Words(字典树活用)
    字典树HihoCoder
    HDU 1277全文检索(字典树)
    HDU 3294 Girls' research(manachar模板题)
    HDU 3294 Girls' research(manachar模板题)
    HDU 4763 Theme Section(KMP灵活应用)
    Ordering Tasks UVA
    Abbott's Revenge UVA
  • 原文地址:https://www.cnblogs.com/hobelee/p/15759617.html
Copyright © 2011-2022 走看看