zoukankan      html  css  js  c++  java
  • .NET Core中使用RabbitMQ正确方式

    .NET Core中使用RabbitMQ正确方式

    首先甩官网:http://www.rabbitmq.com/

    然后是.NET Client链接:http://www.rabbitmq.com/dotnet.html

    GitHub仓库:https://github.com/rabbitmq/rabbitmq-dotnet-client

    下面直接进入正文,一共是两个主题:消费者怎么写?生产者怎么写?

    消费者

    在dotnet core mvc中,消费者肯定不能通过API或者其他的东西启动,理应是跟着程序一起启动的.

    所以...

    在dotnet core 2.0以上版本,我们直接用 IHostedService 接口实现.

    直接上代码.

    // RabbitListener.cs 这个是基类,只实现注册RabbitMQ后到监听消息,然后每个消费者自己去重写RouteKey/QueueName/消息处理函数Process
    
    using System;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;
    using Microsoft.Extensions.Options;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    
    namespace Test.Listener
    {
        public class RabbitListener : IHostedService
        {
    
            private readonly IConnection connection;
            private readonly IModel channel;
    
    
            public RabbitListener(IOptions<AppConfiguration> options)
            {
                try
                {
                    var factory = new ConnectionFactory()
                    {
                        // 这是我这边的配置,自己改成自己用就好
                        HostName = options.Value.RabbitHost,
                        UserName = options.Value.RabbitUserName,
                        Password = options.Value.RabbitPassword,
                        Port = options.Value.RabbitPort,
                    };
                    this.connection = factory.CreateConnection();
                    this.channel = connection.CreateModel();
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"RabbitListener init error,ex:{ex.Message}");
                }
            }
    
            public Task StartAsync(CancellationToken cancellationToken)
            {
                Register();
                return Task.CompletedTask;
            }
    
    
    
    
    
            protected string RouteKey;
            protected string QueueName;
    
            // 处理消息的方法
            public virtual bool Process(string message)
            {
                throw new NotImplementedException();
            }
    
            // 注册消费者监听在这里
            public void Register()
            {
                Console.WriteLine($"RabbitListener register,routeKey:{RouteKey}");
                channel.ExchangeDeclare(exchange: "message", type: "topic");
                channel.QueueDeclare(queue:QueueName, exclusive: false);
                channel.QueueBind(queue: QueueName,
                                  exchange: "message",
                                  routingKey: RouteKey);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    var result = Process(message);
                    if (result)
                    {
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                };
                channel.BasicConsume(queue: QueueName, consumer: consumer);
            }
    
            public void DeRegister()
            {
                this.connection.Close();
            }
    
    
            public Task StopAsync(CancellationToken cancellationToken)
            {
                this.connection.Close();
                return Task.CompletedTask;
            }
        }
    
    }
    
    
    
    // 随便贴一个子类
    
    using System;
    using System.Text;
    using Microsoft.Extensions.Options;
    using Newtonsoft.Json.Linq;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.EntityFrameworkCore;
    using Microsoft.Extensions.Logging;
    
    namespace Test.Listener
    {
        public class ChapterLister : RabbitListener
        {
    
    
            private readonly ILogger<RabbitListener> _logger;
    
            // 因为Process函数是委托回调,直接将其他Service注入的话两者不在一个scope,
            // 这里要调用其他的Service实例只能用IServiceProvider CreateScope后获取实例对象
            private readonly IServiceProvider _services;
    
            public ChapterLister(IServiceProvider services, IOptions<AppConfiguration> options,
             ILogger<RabbitListener> logger) : base(options)
            {
                base.RouteKey = "done.task";
                base.QueueName = "lemonnovelapi.chapter";
                _logger = logger;
                _services = services;
    
            }
    
            public override bool Process(string message)
            {
                var taskMessage = JToken.Parse(message);
                if (taskMessage == null)
                {
                    // 返回false 的时候回直接驳回此消息,表示处理不了
                    return false;
                }
                try
                {
                    using (var scope = _services.CreateScope())
                    {
                        var xxxService = scope.ServiceProvider.GetRequiredService<XXXXService>();
                        return true;
                    }
    
                }
                catch (Exception ex)
                {
                    _logger.LogInformation($"Process fail,error:{ex.Message},stackTrace:{ex.StackTrace},message:{message}");
                    _logger.LogError(-1, ex, "Process fail");
                    return false;
                }
    
            }
        }
    }
    
    

    然后,记住....

    注入到Startup.cs的时候,使用AddHostedService

      services.AddHostedService<ChapterLister>();
    

    消费者就这样玩了.

    生产者咋玩呢?

    这个其实更简单.

    
    using System;
    using System.Net;
    using Newtonsoft.Json.Linq;
    using RestSharp;
    using Microsoft.Extensions.Logging;
    using Microsoft.Extensions.Options;
    using RabbitMQ.Client;
    using Newtonsoft.Json;
    using System.Text;
    
    namespace Test.SDK
    {
        public class RabbitMQClient
        {
    
            private readonly IModel _channel;
    
            private readonly ILogger _logger;
    
    
    
    
            public RabbitMQClient(IOptions<AppConfiguration> options, ILogger<RabbitMQClient> logger)
            {
                try
                {
                    var factory = new ConnectionFactory()
                    {
                        HostName = options.Value.RabbitHost,
                        UserName = options.Value.RabbitUserName,
                        Password = options.Value.RabbitPassword,
                        Port = options.Value.RabbitPort,
                    };
                    var connection = factory.CreateConnection();
                    _channel = connection.CreateModel();
                }
                catch (Exception ex)
                {
                    logger.LogError(-1, ex, "RabbitMQClient init fail");
                }
                _logger = logger;
            }
    
            public virtual void PushMessage(string routingKey, object message)
            {
                _logger.LogInformation($"PushMessage,routingKey:{routingKey}");
                _channel.QueueDeclare(queue: "message",
                                            durable: false,
                                            exclusive: false,
                                            autoDelete: false,
                                            arguments: null);
                string msgJson = JsonConvert.SerializeObject(message);
                var body = Encoding.UTF8.GetBytes(msgJson);
                _channel.BasicPublish(exchange: "message",
                                        routingKey: routingKey,
                                        basicProperties: null,
                                        body: body);
            }
        }
    }
    

    切记注入实例的时候用单例模式.

    services.AddSingleton<RabbitMQClient, RabbitMQClient>();

    全文完...

  • 相关阅读:
    如何用js刷新aspxgridviw
    ASPxSpinEdit 控件的三元判断
    关于cookie
    asp.net解决数据转换为DBNULL的问题
    Devexpress 中如何写ASPxGridView新增修改时的数据验证
    ASPxGridView中批量提交及个别提交的写法
    c#中如何做日期的三元判断(日期不为空赋值)
    c#中如何不通过后台直接用js筛选gridview中的数据条件筛选查询?
    devexpress中如何绑定ASPxTreeList控件
    如何在后台动态生成ASPxCheckBoxList标签并循环(数据调用存储过程)
  • 原文地址:https://www.cnblogs.com/liguobao/p/10254247.html
Copyright © 2011-2022 走看看