zoukankan      html  css  js  c++  java
  • netcore rabbitMq

        public class RabbitConf 
        {
            public string HostName { get; set; }
            public string UserName { get; set; }
            public string Password { get; set; }
            public string Port { get; set; }
        }
    
        public abstract class RabbitListenerService : IHostedService
        {
            private IConnection connection = null;
            private IModel channel = null;
    
            protected string Exchange;
            protected string RouteKey;
            protected string QueueName;
    
            public RabbitListenerService()
            {
                try
                {
                    var config = LocalAppsetting.GetSettingNode<RabbitConf>("Startup", "RabbitMq")?.FirstOrDefault();
                    var factory = new ConnectionFactory()
                    {
                        HostName = config.HostName,
                        UserName = config.UserName,
                        Password = config.Password,
                    };
    
                    if (!string.IsNullOrEmpty(config.Port))
                    {
                        factory.Port = Convert.ToInt32(config.Port);
                    }
                    connection = factory.CreateConnection();
                    channel = connection.CreateModel();
                }
                catch (Exception ex)
                {
                    Log4netHelper.Error(ex.Message +Environment.NewLine+ ex.StackTrace);
                }
            }
    
            public Task StartAsync(CancellationToken cancellationToken)
            {
                Register();
                return Task.CompletedTask;
            }
    
            public async virtual Task<bool> Process(string message)
            {
                throw new NotImplementedException();
            }
    
            public void Register()
            {
                channel.ExchangeDeclare(Exchange, "topic", true, false, null);
                channel.QueueDeclare(QueueName, true, false, false, null);
                channel.QueueBind(QueueName, Exchange, RouteKey);
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += async (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    var result = await Process(message);
                    if (result)
                    {
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                };
                channel.BasicConsume(queue: QueueName, consumer: consumer);
            }
    
            public void DeRegister()
            {
                connection.Close();
            }
    
            public Task StopAsync(CancellationToken cancellationToken)
            {
                connection.Close();
                return Task.CompletedTask;
            }
        }
        public class RabbitErpFefundConfirmService : RabbitListenerService
        {
            private readonly IRefundServices _refundServices;
    
            public RabbitErpFefundConfirmService(IRefundServices refundServices)
            {
                Exchange = "ErpRefund";
                QueueName = "RefundConfirm";
                RouteKey = "RefundConfirm";
                _refundServices = refundServices;
            }
    
            public async override Task<bool> Process(string message)
            {
                try
                {
                    Log4netHelper.Info(message);
                    var msg = JsonConvert.DeserializeObject<List<RefundAuditRequest>>(message);
                    var result = await _refundServices.RefundAudit(msg);
                    return result;
                }
                catch (Exception err)
                {
                    Log4netHelper.Error(err.Message + Environment.NewLine + err.StackTrace);
                    return false;
                }
            }
    
        }
        public class RabbitMqUtil
        {
            static ConnectionFactory GetConnectionFactory
            {
                get
                {
                    var config = LocalAppsetting.GetSettingNode<RabbitConf>("Startup", "RabbitMq")?.FirstOrDefault();
                    var factory = new ConnectionFactory()
                    {
                        HostName = config.HostName,
                        UserName = config.UserName,
                        Password = config.Password,
                    };
    
                    if (!string.IsNullOrEmpty(config.Port))
                    {
                        factory.Port = Convert.ToInt32(config.Port);
                    }
                    return factory;
                }
            }
    
            public static bool BasicPublish(string exchange, string routeKey, object msg, string queue = "")
            {
                try
                {
                    using (var connection = GetConnectionFactory.CreateConnection())
                    {
                        using (var channel = connection.CreateModel())
                        {
                            channel.ExchangeDeclare(exchange, "topic", true, false, null);
                            if (!string.IsNullOrEmpty(queue))
                            {
                                channel.QueueDeclare(queue, true, false, false, null);
                                channel.QueueBind(queue, exchange, $"#.{routeKey}.#", null);
                            }
    
                            var properties = channel.CreateBasicProperties();
                            properties.DeliveryMode = 2;
    
                            string message = JsonConvert.SerializeObject(msg);
                            channel.BasicPublish(exchange, routeKey, properties, Encoding.UTF8.GetBytes(message));
                            return true;
                        }
                    }
                }
                catch (Exception err)
                {
                    Log4netHelper.Error("发送消息失败:" + err.Message);
                    return false;
                }
    
            }
        }
  • 相关阅读:
    python3.6+requests实现接口自动化4
    python3.6+requests实现接口自动化3
    Druid学习之路 (五)Druid的数据摄取任务类型
    Druid学习之路 (四)Druid的数据采集格式
    Druid学习之路 (三)Druid的数据源和段
    Druid学习之路 (二)Druid架构
    Druid学习之路 (一)Druid初识
    Hive sql和Presto sql的一些对比
    Pyspark的HBaseConverters详解
    Pyspark访问Hbase
  • 原文地址:https://www.cnblogs.com/jonney-wang/p/14025514.html
Copyright © 2011-2022 走看看