public class RabbitConf { public string HostName { get; set; } public string UserName { get; set; } public string Password { get; set; } public string Port { get; set; } } public abstract class RabbitListenerService : IHostedService { private IConnection connection = null; private IModel channel = null; protected string Exchange; protected string RouteKey; protected string QueueName; public RabbitListenerService() { try { var config = LocalAppsetting.GetSettingNode<RabbitConf>("Startup", "RabbitMq")?.FirstOrDefault(); var factory = new ConnectionFactory() { HostName = config.HostName, UserName = config.UserName, Password = config.Password, }; if (!string.IsNullOrEmpty(config.Port)) { factory.Port = Convert.ToInt32(config.Port); } connection = factory.CreateConnection(); channel = connection.CreateModel(); } catch (Exception ex) { Log4netHelper.Error(ex.Message +Environment.NewLine+ ex.StackTrace); } } public Task StartAsync(CancellationToken cancellationToken) { Register(); return Task.CompletedTask; } public async virtual Task<bool> Process(string message) { throw new NotImplementedException(); } public void Register() { channel.ExchangeDeclare(Exchange, "topic", true, false, null); channel.QueueDeclare(QueueName, true, false, false, null); channel.QueueBind(QueueName, Exchange, RouteKey); var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); var result = await Process(message); if (result) { channel.BasicAck(ea.DeliveryTag, false); } }; channel.BasicConsume(queue: QueueName, consumer: consumer); } public void DeRegister() { connection.Close(); } public Task StopAsync(CancellationToken cancellationToken) { connection.Close(); return Task.CompletedTask; } }
public class RabbitErpFefundConfirmService : RabbitListenerService { private readonly IRefundServices _refundServices; public RabbitErpFefundConfirmService(IRefundServices refundServices) { Exchange = "ErpRefund"; QueueName = "RefundConfirm"; RouteKey = "RefundConfirm"; _refundServices = refundServices; } public async override Task<bool> Process(string message) { try { Log4netHelper.Info(message); var msg = JsonConvert.DeserializeObject<List<RefundAuditRequest>>(message); var result = await _refundServices.RefundAudit(msg); return result; } catch (Exception err) { Log4netHelper.Error(err.Message + Environment.NewLine + err.StackTrace); return false; } } }
public class RabbitMqUtil { static ConnectionFactory GetConnectionFactory { get { var config = LocalAppsetting.GetSettingNode<RabbitConf>("Startup", "RabbitMq")?.FirstOrDefault(); var factory = new ConnectionFactory() { HostName = config.HostName, UserName = config.UserName, Password = config.Password, }; if (!string.IsNullOrEmpty(config.Port)) { factory.Port = Convert.ToInt32(config.Port); } return factory; } } public static bool BasicPublish(string exchange, string routeKey, object msg, string queue = "") { try { using (var connection = GetConnectionFactory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange, "topic", true, false, null); if (!string.IsNullOrEmpty(queue)) { channel.QueueDeclare(queue, true, false, false, null); channel.QueueBind(queue, exchange, $"#.{routeKey}.#", null); } var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; string message = JsonConvert.SerializeObject(msg); channel.BasicPublish(exchange, routeKey, properties, Encoding.UTF8.GetBytes(message)); return true; } } } catch (Exception err) { Log4netHelper.Error("发送消息失败:" + err.Message); return false; } } }