zoukankan      html  css  js  c++  java
  • RabbitMQ学习ASP.NET Core中使用RabbitMQ

    代码参考连接:https://www.bilibili.com/video/BV1GU4y1w7Yq?p=9&spm_id_from=pageDriver

    1.项目结构

    项目类型为ASP.NET Core Web API

     

     

       

       

    2.Infrastructure类库代码 

    nuget添加RabbitMQ.Client的引用

     (1) RabbitConnection.cs代码

     1 using RabbitMQ.Client;
     2 using System;
     3 using System.Collections.Generic;
     4 using System.Text;
     5 
     6 namespace Infrastructure.Config
     7 {
     8     public class RabbitConnection
     9     {
    10         private readonly RabbitOption _config;
    11         private IConnection _connection = null;
    12         public RabbitConnection(RabbitOption rabbitOption)
    13         {
    14             _config = rabbitOption;
    15         }
    16 
    17         public IConnection GetConnection()
    18         {
    19             if (_connection == null)
    20             {
    21                 if (string.IsNullOrEmpty(_config.Address))
    22                 {
    23                     ConnectionFactory factory = new ConnectionFactory();
    24                     factory.HostName = _config.HostName;
    25                     factory.Port = _config.Port;
    26                     factory.UserName = _config.UserName;
    27                     factory.Password = _config.Password;
    28                     factory.VirtualHost = _config.VirtualHost;
    29                     _connection = factory.CreateConnection();
    30                 }
    31                 else
    32                 {
    33                     ConnectionFactory factory = new ConnectionFactory();
    34                     factory.UserName = _config.UserName;
    35                     factory.Password = _config.Password;
    36                     factory.VirtualHost = _config.VirtualHost;
    37 
    38                     var address = _config.Address;
    39                     List<AmqpTcpEndpoint> endpoints = new List<AmqpTcpEndpoint>();
    40                     foreach (var endpoint in address.Split(","))
    41                     {
    42                         endpoints.Add(new AmqpTcpEndpoint(endpoint.Split(":")[0],int.Parse(endpoint.Split(":")[1])));
    43                     }
    44                     _connection = factory.CreateConnection(endpoints);
    45                 }
    46             }
    47             return _connection;
    48         }
    49     }
    50 }

    (2)RabbitConstant.cs代码

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Text;
     4 
     5 namespace Infrastructure.Config
     6 {
     7     public class RabbitConstant
     8     {
     9         public const string TEST_EXCHANGE = "test.exchange";
    10         public const string TEST_QUEUE = "test.queue";
    11     }
    12 }

    (3)RabbitOption.cs代码

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Text;
     4 
     5 namespace Infrastructure.Config
     6 {
     7     public class RabbitOption
     8     {
     9         /// <summary>
    10         /// 主机名称
    11         /// </summary>
    12         public string HostName { get; set; }
    13         public string Address { get; set; }
    14         /// <summary>
    15         /// 端口号
    16         /// </summary>
    17         public int Port { get; set; }
    18         public string UserName { get; set; }
    19         public string Password { get; set; }
    20 
    21         public string VirtualHost { get; set; }
    22     }
    23 }

    (4) QueueInfo.cs

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Text;
     4 
     5 namespace Infrastructure.Consumer
     6 {
     7     public class QueueInfo
     8     {
     9         /// <summary>
    10         /// 队列名称
    11         /// </summary>
    12         public string Queue { get; set; }
    13         /// <summary>
    14         /// 路由名称
    15         /// </summary>
    16         public string RoutingKey { get; set; }
    17         /// <summary>
    18         /// 交换机类型
    19         /// </summary>
    20         public string ExchangeType { get; set; }
    21         /// <summary>
    22         /// 交换机名称
    23         /// </summary>
    24         public string Exchange { get; set; }
    25         public IDictionary<string, object> props { get; set; } = null;
    26         public Action<RabbitMessageEntity> OnRecevied { get; set; }
    27     }
    28 }

    (5)RabbitChannelConfig.cs

     1 using RabbitMQ.Client;
     2 using RabbitMQ.Client.Events;
     3 using System;
     4 using System.Collections.Generic;
     5 using System.Text;
     6 
     7 namespace Infrastructure.Consumer
     8 {
     9     public class RabbitChannelConfig
    10     {
    11         public string ExchangeTypeName { get; set; }
    12         public string ExchangeName { get; set; }
    13         public string QueueName { get; set; }
    14         public string RoutingKeyName { get; set; }
    15         public IConnection Connection { get; set; }
    16         public EventingBasicConsumer Consumer { get; set; }
    17 
    18         /// <summary>
    19         /// 外部订阅消费者通知委托
    20         /// </summary>
    21         public Action<RabbitMessageEntity> OnReceivedCallback { get; set; }
    22 
    23         public RabbitChannelConfig(string exchangeType,string exchange,string queue,string routingKey)
    24         {
    25             this.ExchangeTypeName = exchangeType;
    26             this.ExchangeName = exchange;
    27             this.QueueName = queue;
    28             this.RoutingKeyName = routingKey;
    29         }
    30 
    31         public void Receive(object sender,BasicDeliverEventArgs args)
    32         {
    33             RabbitMessageEntity body = new RabbitMessageEntity();
    34             try
    35             {
    36                 string content = Encoding.UTF8.GetString(args.Body.ToArray());
    37                 body.Content = content;
    38                 body.Consumer =(EventingBasicConsumer)sender;
    39                 body.BasicDeliver = args;
    40             }
    41             catch (Exception e)
    42             {
    43                 body.ErrorMessage = $"订阅出错{e.Message}";
    44                 body.Exception = e;
    45                 body.Error = true;
    46                 body.Code = 500;
    47             }
    48             OnReceivedCallback?.Invoke(body);
    49         }
    50     }
    51 }

    (6)RabbitChannelManager.cs

     1 using Infrastructure.Config;
     2 using RabbitMQ.Client;
     3 using RabbitMQ.Client.Events;
     4 using System;
     5 using System.Collections.Generic;
     6 using System.Text;
     7 
     8 namespace Infrastructure.Consumer
     9 {
    10     public class RabbitChannelManager
    11     {
    12         public RabbitConnection Connection { get; set; }
    13 
    14         public RabbitChannelManager(RabbitConnection connection)
    15         {
    16             this.Connection = connection;
    17         }
    18 
    19         /// <summary>
    20         /// 创建接收消息的通道
    21         /// </summary>
    22         /// <param name="exchangeType"></param>
    23         /// <param name="exchange"></param>
    24         /// <param name="queue"></param>
    25         /// <param name="routingKey"></param>
    26         /// <param name="arguments"></param>
    27         /// <returns></returns>
    28         public RabbitChannelConfig CreateReceiveChannel(string exchangeType,string exchange,string queue,string routingKey,
    29             IDictionary<string,object>arguments = null)
    30         {
    31             IModel model = this.CreateModel(exchangeType,exchange,queue,routingKey,arguments);
    32             EventingBasicConsumer consumer = this.CreateConsumer(model,queue);
    33             RabbitChannelConfig channel = new RabbitChannelConfig(exchangeType,exchange,queue,routingKey);
    34             consumer.Received += channel.Receive;
    35             return channel;
    36         }
    37 
    38       
    39         /// <summary>
    40         /// 创建一个通道,包含交换机/队列/路由,并建立绑定关系
    41         /// </summary>
    42         /// <param name="exchangeType">交换机类型:Topic,Direct,Fanout</param>
    43         /// <param name="exchange">交换机名称</param>
    44         /// <param name="queue">队列名称</param>
    45         /// <param name="routingKey">路由名称</param>
    46         /// <param name="arguments"></param>
    47         /// <returns></returns>
    48         private IModel CreateModel(string exchangeType, string exchange, string queue, string routingKey, IDictionary<string, object> arguments)
    49         {
    50             exchangeType = string.IsNullOrEmpty(exchangeType) ? "default" : exchangeType;
    51             IModel model = this.Connection.GetConnection().CreateModel();
    52             model.BasicQos(0,1,false);
    53             model.QueueDeclare(queue,true,false,false,arguments);
    54             model.ExchangeDeclare(exchange, exchangeType);
    55             model.QueueBind(queue, exchange, routingKey);
    56             return model;
    57         }
    58 
    59         /// <summary>
    60         /// 创建消费者
    61         /// </summary>
    62         /// <param name="model"></param>
    63         /// <param name="queue"></param>
    64         /// <returns></returns>
    65         private EventingBasicConsumer CreateConsumer(IModel model, string queue)
    66         {
    67             EventingBasicConsumer consumer = new EventingBasicConsumer(model);
    68             model.BasicConsume(queue, false, consumer);
    69             return consumer;
    70         }
    71     }
    72 }

    (7)RabbitMessageEntity.cs

     1 using RabbitMQ.Client.Events;
     2 using System;
     3 using System.Collections.Generic;
     4 using System.Text;
     5 
     6 namespace Infrastructure.Consumer
     7 {
     8     public class RabbitMessageEntity
     9     {
    10         public string Content { get; set; }
    11         public EventingBasicConsumer Consumer { get; set; }
    12         public BasicDeliverEventArgs BasicDeliver { get; set; }
    13         public string ErrorMessage { get; set; }
    14         public Exception Exception { get; set; }
    15         public bool Error { get; set; }
    16         public int Code { get; set; }
    17     }
    18 }

    (8)IRabbitProducer.cs

     1 using System;
     2 using System.Collections.Generic;
     3 
     4 namespace Infrastructure.Producer
     5 {
     6     public interface IRabbitProducer
     7     {
     8         public void Publish(string exchange,string routingKey,IDictionary<string,object> props,string content);
     9     }
    10 }

    (9)RabbitProducer.cs

     1 using Infrastructure.Config;
     2 using System;
     3 using System.Collections.Generic;
     4 using System.Text;
     5 
     6 namespace Infrastructure.Producer
     7 {
     8     public class RabbitProducer : IRabbitProducer
     9     {
    10         private readonly RabbitConnection _connection;
    11         public RabbitProducer(RabbitConnection connection)
    12         {
    13             _connection = connection;
    14         }
    15         public void Publish(string exchange, string routingKey, IDictionary<string, object> props, string content)
    16         {
    17             var channel = _connection.GetConnection().CreateModel();
    18             var prop = channel.CreateBasicProperties();
    19             if (props.Count > 0)
    20             {
    21                 var delay = props["x-delay"];
    22                 prop.Expiration = delay.ToString();
    23             }
    24             channel.BasicPublish(exchange, routingKey, false, prop, Encoding.UTF8.GetBytes(content));
    25         }
    26     }
    27 }

    3.RabbitMQ.Services类库代码

    添加Infrastructure的项目引用

    (1)IOrderService.cs

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Text;
     4 
     5 namespace RabbitMQ.Services
     6 {
     7     public interface IOrderService
     8     {
     9         void SendOrderMessage();
    10         void SendTestMessage(string message);
    11     }
    12 }

    (2) OrderService.cs

     1 using Infrastructure.Config;
     2 using Infrastructure.Producer;
     3 using System;
     4 using System.Collections.Generic;
     5 using System.Text;
     6 
     7 namespace RabbitMQ.Services
     8 {
     9     public class OrderService : IOrderService
    10     {
    11         private readonly IRabbitProducer _rabbitProducer;
    12         public OrderService(IRabbitProducer rabbitProducer)
    13         {
    14             _rabbitProducer = rabbitProducer;
    15         }
    16         public void SendOrderMessage()
    17         {
    18             throw new NotImplementedException();
    19         }
    20 
    21         public void SendTestMessage(string message)
    22         {
    23             Console.WriteLine($"send message:{message}");
    24             _rabbitProducer.Publish(RabbitConstant.TEST_EXCHANGE,"",new Dictionary<string,object>(),message);
    25         }
    26     }
    27 }

    4. RabbitMQ.Test控制台代码

    添加RabbitMQ.WebApi.Order项目引用

    复制项目RabbitMQ.WebApi.Order下的appsettings.json到此项目中

    (1)ProcessTest.cs

     1 using Infrastructure.Config;
     2 using Infrastructure.Consumer;
     3 using Microsoft.Extensions.Hosting;
     4 using RabbitMQ.Client;
     5 using System;
     6 using System.Collections.Generic;
     7 using System.Text;
     8 using System.Threading;
     9 using System.Threading.Tasks;
    10 
    11 namespace RabbitMQ.Test
    12 {
    13     public class ProcessTest : IHostedService
    14     {
    15         private readonly RabbitConnection _connection;
    16         public List<QueueInfo> Queues { get; } = new List<QueueInfo>();
    17         public ProcessTest(RabbitConnection connection)
    18         {
    19             _connection = connection;
    20             Queues.Add(new QueueInfo()
    21             { 
    22                ExchangeType = ExchangeType.Fanout,
    23                Exchange = RabbitConstant.TEST_EXCHANGE,
    24                Queue = RabbitConstant.TEST_QUEUE,
    25                RoutingKey = "",
    26                OnRecevied = this.Receive
    27             });
    28         }
    29 
    30         private void Receive(RabbitMessageEntity message)
    31         {
    32             Console.WriteLine($"Test Receive Message:{message.Content}");
    33             message.Consumer.Model.BasicAck(message.BasicDeliver.DeliveryTag,true);
    34         }
    35 
    36         public Task StartAsync(CancellationToken cancellationToken)
    37         {
    38             Console.WriteLine("RabbitMQ测试消息接收处理服务正在启动");
    39             RabbitChannelManager channelManager = new RabbitChannelManager(_connection);
    40             foreach (var queueInfo in Queues)
    41             {
    42                 RabbitChannelConfig channel = channelManager.CreateReceiveChannel(queueInfo.ExchangeType,
    43                     queueInfo.Exchange,queueInfo.Queue,queueInfo.RoutingKey);
    44                 channel.OnReceivedCallback = queueInfo.OnRecevied;
    45             }
    46             Console.WriteLine("RabbitMQ测试消息接收处理服务已经启动");
    47             return Task.CompletedTask;
    48         }
    49 
    50         public Task StopAsync(CancellationToken cancellationToken)
    51         {
    52             return Task.CompletedTask;
    53         }
    54     }
    55 }

    (2) Program.cs

     1 using Infrastructure.Config;
     2 using Microsoft.Extensions.Configuration;
     3 using Microsoft.Extensions.DependencyInjection;
     4 using Microsoft.Extensions.Hosting;
     5 using System;
     6 
     7 namespace RabbitMQ.Test
     8 {
     9     class Program
    10     {
    11         static void Main(string[] args)
    12         {
    13             var section = new ConfigurationBuilder()
    14                 .AddJsonFile("appsettings.json")
    15                 .Build()
    16                 .GetSection("RabbitMQ");
    17             var host = new HostBuilder()
    18                 .ConfigureServices(services =>
    19                  services.AddSingleton(new RabbitConnection(section.Get<RabbitOption>()))
    20                          .AddSingleton<IHostedService,ProcessTest>())
    21                 .Build();
    22              host.Run();
    23         }
    24     }
    25 }

    5. RabbitMQ.WebApi.Order代码

    nuget添加Swashbuckle.AspNetCore引用

    添加项目Infrastructure、RabbitMQ.Services引用

    (1) OrderController.cs

     1 using Microsoft.AspNetCore.Http;
     2 using Microsoft.AspNetCore.Mvc;
     3 using RabbitMQ.Services;
     4 using System;
     5 using System.Collections.Generic;
     6 using System.Linq;
     7 using System.Threading.Tasks;
     8 
     9 namespace RabbitMQ.WebApi.Order.Controllers
    10 {
    11     [Route("api/[controller]")]
    12     [ApiController]
    13     public class OrderController : ControllerBase
    14     {
    15         private readonly IOrderService _orderService;
    16 
    17         public OrderController(IOrderService orderService)
    18         {
    19             _orderService = orderService;
    20         }
    21 
    22         //[HttpGet]
    23         //public IActionResult Order()
    24         //{
    25         //    _orderService.SendOrderMessage();
    26         //    return Ok();
    27         //}
    28         [HttpGet("test")]
    29         public IActionResult Test(string message)
    30         {
    31             _orderService.SendTestMessage(message);
    32             return Ok();
    33         }
    34 
    35     }
    36 }

    (2) ServiceExtensions.cs

     1 using Infrastructure.Config;
     2 using Microsoft.Extensions.Configuration;
     3 using Microsoft.Extensions.DependencyInjection;
     4 using System;
     5 using System.Collections.Generic;
     6 using System.Linq;
     7 using System.Threading.Tasks;
     8 
     9 namespace RabbitMQ.WebApi.Order.Extensions
    10 {
    11     public static class ServiceExtensions
    12     {
    13         public static void ConfigureCors(this IServiceCollection services)
    14         {
    15             services.AddCors(options=>
    16             {
    17                 options.AddPolicy("AnyPolicy",
    18                     builder=>builder.AllowAnyOrigin()
    19                                     .AllowAnyMethod()
    20                                     .AllowAnyHeader());
    21             });
    22         }
    23         public static void ConfigureRabbitContext(this IServiceCollection services,IConfiguration config)
    24         {
    25             var section = config.GetSection("RabbitMQ");
    26             services.AddSingleton(
    27                   new RabbitConnection(section.Get<RabbitOption>())); 
    28         }
    29     }
    30 }

    (3) Startup.cs

     1 using Microsoft.AspNetCore.Builder;
     2 using Microsoft.AspNetCore.Hosting;
     3 using Microsoft.AspNetCore.Mvc;
     4 using Microsoft.Extensions.Configuration;
     5 using Microsoft.Extensions.DependencyInjection;
     6 using Microsoft.Extensions.Hosting;
     7 using Microsoft.Extensions.Logging;
     8 using System;
     9 using System.Collections.Generic;
    10 using System.Linq;
    11 using System.Threading.Tasks;
    12 using RabbitMQ.WebApi.Order.Extensions;
    13 using Microsoft.OpenApi.Models;
    14 using RabbitMQ.Services;
    15 using Infrastructure.Producer;
    16 
    17 namespace RabbitMQ.WebApi.Order
    18 {
    19     public class Startup
    20     {
    21         public Startup(IConfiguration configuration)
    22         {
    23             Configuration = configuration;
    24         }
    25 
    26         public IConfiguration Configuration { get; }
    27 
    28         // This method gets called by the runtime. Use this method to add services to the container.
    29         public void ConfigureServices(IServiceCollection services)
    30         {
    31             services.AddControllers();
    32             services.AddSwaggerGen(c=>
    33             {
    34                 c.SwaggerDoc("v1",new OpenApiInfo { Title="RabbitMQ.WebApi.Order",Version="v1"});
    35             });
    36             services.AddScoped<IOrderService, OrderService>();
    37             services.AddScoped<IRabbitProducer,RabbitProducer>();
    38             services.ConfigureCors();
    39             services.ConfigureRabbitContext(Configuration);
    40         }
    41 
    42         // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
    43         public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    44         {
    45             if (env.IsDevelopment())
    46             {
    47                 app.UseDeveloperExceptionPage();
    48                 app.UseSwagger();
    49                 app.UseSwaggerUI(c=>c.SwaggerEndpoint("/swagger/v1/swagger.json","RabbitMQ.WebApi.Order v1"));
    50             }
    51 
    52             app.UseRouting();
    53 
    54             app.UseAuthorization();
    55 
    56             app.UseEndpoints(endpoints =>
    57             {
    58                 endpoints.MapControllers();
    59             });
    60         }
    61     }
    62 }

    (4) appsettings.json

    {
      "Logging": {
        "LogLevel": {
          "Default": "Information",
          "Microsoft": "Warning",
          "Microsoft.Hosting.Lifetime": "Information"
        }
      },
      "RabbitMQ": {
        "HostName": "127.0.0.1",
        "Address": "",
        "Port": 5672,
        "UserName": "guest",
        "Password": "guest",
        "VirtualHost": "/"
      },
    
      "AllowedHosts": "*"
    }

    6.运行

    (1)  powershell管理员模式运行RabbitMQ.Test项目

    PS D:\DotNetProject\RabbitMQ.WebApiDemo\RabbitMQ.Test> dotnet run

     (2)  powershell管理员模式运行RabbitMQ.WebApi.Order项目 

    PS D:\DotNetProject\RabbitMQ.WebApiDemo\RabbitMQ.WebApi.Order> dotnet run

     (3) 浏览器运行http://localhost:5000/swagger/index.html

     (4) 测试接口Test 

     (9)运行结果

     如需要项目源码,请留下邮箱,会将项目整个发送

    IRabbitProducer

  • 相关阅读:
    PHP chgrp() 函数
    PHP basename() 函数
    PHP 5 Filesystem 函数
    PHP rewinddir() 函数
    PHP readdir() 函数
    PHP opendir() 函数
    PHP getcwd() 函数
    PHP dir() 函数
    PHP closedir() 函数
    PHP chroot() 函数
  • 原文地址:https://www.cnblogs.com/hobelee/p/15759617.html
Copyright © 2011-2022 走看看