zoukankan      html  css  js  c++  java
  • .net平台RabbitMQ封装及使用

    public abstract class MqHelper
        {
            protected static IConnection defaultConnection = null;
            private static ConnectionFactory ConnectionFactory;
            public static string MqConnection = ConfigurationManager.AppSettings["MqConnection"];
    
            static MqHelper()
            {
                ConnectionFactory = new ConnectionFactory();
                ConnectionFactory.Uri = new Uri(MqConnection);
                ConnectionFactory.AutomaticRecoveryEnabled = true;
                ConnectionFactory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
                defaultConnection = ConnectionFactory.CreateConnection();
            }
    
            /// <summary>
            /// 获取默认的消息队列连接
            /// 今天的参考资料说:“线程间不能共享通道实例”。
            /// </summary>
            public static IConnection DefaultConnection
            {
                get
                {
                    if (defaultConnection == null)
                    {
                        //读取配置文件
                        string uri = System.Configuration.ConfigurationManager.AppSettings["MqConnection"];
                        defaultConnection = CreateConnectionByUri(uri);
    
                        #region 例子
                        // factory.Uri ="amqp://guest:guest@localhost:5672/%2f"
                        //factory.HostName = "localhost";   //主机地址
                        //factory.VirtualHost = "/";        //多租户时 虚拟主机地址
                        //factory.Port = 5672;              //服务端口号
                        //factory.UserName = "guest";       //用户名
                        //factory.Password = "guest";       //密码
                        //factory.CreateConnection();
                        #endregion
                    }
                    return defaultConnection;
                }
            }
    
            /// <summary>
            /// 通过Uri 创建消息队里连接
            /// </summary>
            /// <param name="uri">amqp://guest:guest@localhost:5672/vhost</param>
            /// <returns></returns>
            public static IConnection CreateConnectionByUri(string uri)
            {
                var factory = new ConnectionFactory();
                factory.Uri = new Uri(uri);
                return factory.CreateConnection();
            }
    
    
            /// <summary>
            /// 创建连接
            /// </summary>
            /// <param name="hostName">消息队列服务器地址</param>
            /// <param name="vHost">虚拟主机</param>
            /// <param name="port">端口号</param>
            /// <param name="userName">用户名,  默认值:guest</param>
            /// <param name="password">用户密码,  默认值:guest</param>
            /// <returns></returns>
            /// <example>
            /// using (var connection = MqHelper.CreateConnection()) 
            /// using (var channel = connection.CreateModel())
            /// { 
            ///     ///ToDo:处理
            /// }
            /// </example>
            public static IConnection CreateConnection(string hostName = "", string virtualHost = "/", int port = 5672, string userName = "", string password = "")
            {
                var factory = new ConnectionFactory();
                //主机地址
                factory.HostName = hostName;
                //多租户时 虚拟主机地址
                factory.VirtualHost = virtualHost;
                //端口号
                factory.Port = port;
                //用户名密码为空时从配置文件获取               
                factory.UserName = string.IsNullOrEmpty(userName) ? "guest" : userName;
                factory.Password = string.IsNullOrEmpty(password) ? "guest" : password;
    
                IConnection connection = factory.CreateConnection();
                return connection;
            }
    
            /// <summary>
            /// 创建通道
            /// </summary>
            /// <param name="conn"></param>
            /// <returns></returns>
            public static IModel CetateChannel(IConnection conn = null)
            {
                conn = conn ?? DefaultConnection;
                return conn.CreateModel();
            }
    
            /// <summary>
            /// 声明交换器
            /// </summary>
            /// <param name="exchangeName">交换器名称</param>
            /// <param name="type">类型</param>
            /// <param name="chanel">通道</param>
            /// <param name="durable">是否持久化,默认: true</param>
            /// <param name="autoDelete">自动删除:一旦客户端连接断开则自动删除,默认: false</param>
            /// <param name="arguments">其他参数:如果安装了队列优先级插件则可以设置优先级</param>
            public static void ExchangeDeclare(string exchangeName, string type, IModel chanel = null, bool durable = true, bool autoDelete = false, IDictionary<string, object> arguments = null)
            {
                IModel ch = chanel ?? CetateChannel();
                ch.ExchangeDeclare(exchange: exchangeName,   //队列名
                                type: type,                 // 类型
                                durable: true,              //是否持久化
                                autoDelete: false,          //一旦客户端连接断开则自动删除queue
                                arguments: null);           //如果安装了队列优先级插件则可以设置优先级
                if (chanel == null)
                {
                    //销毁自动创建的 通道
                    ch.Close();
                    ch.Dispose();
                }
            }
    
            /// <summary>
            /// 声明队列或者查询队列信息
            /// 通过配置加载程序中无需手动声明,队列多次声明不会重复创建
            /// 但是:队列多次被声明,并且配置与已有的不一致时,会抛出 OperationInterruptedException 异常
            /// </summary>
            /// <param name="queueName">队列名称</param>
            /// <param name="channel">与消息队列连接的通道</param>
            /// <param name="durable">是否持久化,默认: true</param>
            /// <param name="exclusive">排它,默认:false </param>
            /// <param name="autoDelete">自动删除:一旦客户端连接断开则自动删除,默认: false</param>
            /// <param name="arguments">其他参数:如果安装了队列优先级插件则可以设置优先级</param>
            public static QueueDeclareOk QueueDeclare(string queueName, IModel channel, bool durable = true, bool exclusive = false, bool autoDelete = false, IDictionary<string, object> arguments = null)
            {
                return channel.QueueDeclare(queue: queueName,   //队列名
                                durable: true,      //是否持久化
                                exclusive: false,   //排它
                                autoDelete: false,  //一旦客户端连接断开则自动删除queue
                                arguments: null);   //如果安装了队列优先级插件则可以设置优先级
            }
    
            /// <summary>
            /// 发送消息
            /// </summary>
            /// <param name="exchange">交换器</param>
            /// <param name="routingKey">routingKey</param>
            /// <param name="message">消息</param>
            /// <param name="chanel"></param>
            /// <param name="mandatory"></param>
            /// <param name="basicProperties"></param>
            public static void Publish(string exchange, string routingKey, dynamic message, IModel chanel = null, bool mandatory = false, IBasicProperties basicProperties = null)
            {
                IModel ch = chanel ?? CetateChannel();
                var body = MessageUtility.GetBytes(message);
                //ch.BasicPublish(exchange: exchange, routingKey: routingKey, mandatory: mandatory, basicProperties: basicProperties, body:body);
                ch.BasicPublish(exchange, routingKey, mandatory, basicProperties, body);
                //销毁自动创建的通道
                if (chanel == null)
                {
                    ch.Close();
                    ch.Dispose();
                }
            }
    
            /// <summary>
            /// 发送消息
            /// </summary>
            /// <param name="exchange">交换器</param>
            /// <param name="routingKey">routingKey</param>
            /// <param name="body">消息内容</param>
            /// <param name="chanel"></param>
            /// <param name="mandatory"></param>
            /// <param name="basicProperties"></param>
            public static void Publish(string exchange, string routingKey, byte[] body, IModel chanel = null, bool mandatory = false, IBasicProperties basicProperties = null)
            {
                IModel ch = (chanel != null) ? chanel : CetateChannel();
                ch.BasicPublish(exchange: exchange, routingKey: routingKey, mandatory: mandatory, basicProperties: basicProperties, body: body);
    
                //销毁自动创建的通道
                if (chanel == null)
                {
                    ch.Close();
                    ch.Dispose();
                }
            }
        }
    public static class MessageUtility
        {
            /// <summary>
            /// 序列化: 将对象转二进制数组
            /// </summary>
            /// <param name="obj"></param>
            /// <returns></returns>
            public static byte[] GetBytes(dynamic obj)
            {
                if (obj == null)
                {
                    return null;
                }
                string json = JsonConvert.SerializeObject(obj);
                return Encoding.UTF8.GetBytes(json);
            }
            /// <summary>
            /// 反序列化:二进制数组转类 
            /// </summary>
            /// <typeparam name="T"></typeparam>
            /// <param name="messageBody"></param>
            /// <returns></returns>
            public static T ReadBytes<T>(byte[] messageBody)
            {
                if (messageBody == null || messageBody.Length < 1)
                {
                    return default(T);
                }
                var json = Encoding.UTF8.GetString(messageBody);
                if (typeof(T) == typeof(string))
                {
                    return (T)(object)(json);
                }
                T obj = JsonConvert.DeserializeObject<T>(json);
                return obj;
            }
        }
    public static class MqService
        {
            public static Dictionary<string, IModel> Channels = new Dictionary<string, IModel>();
            static Dictionary<string, string> Tags = new Dictionary<string, string>();
    
            /// <summary>
            /// 注册消息队列接收
            /// </summary>
            /// <param name="routKey">路由</param>
            /// <param name="OnReceived">收到后的消息处理</param>
            public static IModel RegisterConsume(string routKey, EventHandler<BasicDeliverEventArgs> OnReceived, bool autoAck = true)
            {
                IModel sysChannel = Channels.ContainsKey(routKey) ? Channels[routKey] : null;
                if (sysChannel == null || sysChannel.IsClosed)
                {
                    sysChannel = MqHelper.CetateChannel();
                    EventingBasicConsumer syslConsumer = new EventingBasicConsumer(sysChannel);
                    syslConsumer.Received += OnReceived;
                    var tag = sysChannel.BasicConsume(routKey, autoAck, syslConsumer);
                    if (!Channels.ContainsKey(routKey))
                    {
                        Channels.Add(routKey, sysChannel);
                        Tags.Add(routKey, tag);
                    }
                    else
                    {
                        Channels[routKey] = sysChannel;
                        Tags[routKey] = tag;
                    }
                }
                return sysChannel;
            }
    
            /// <summary>
            /// 消息接收以后ACK
            /// </summary>
            /// <param name="e"></param>
            /// <param name="routKey"></param>
            public static void Ack(this BasicDeliverEventArgs e, string routKey)
            {
                if (!Channels.ContainsKey(routKey))
                {
                    return;
                }
                IModel sysChannel = Channels[routKey];
                sysChannel.BasicAck(e.DeliveryTag, false);
            }
    
            ////事件处理的例子
            //private static void SystemLog_OnReceived(object sender, BasicDeliverEventArgs e)
            //{
            //    try
            //    {
            //        IModel sysChannel = Channels[routKey];
            //        channel.basicAck(e.DeliveryTag, false);
            //        SystemLog log = MessageUtility.ReadBytes<SystemLog>(e.Body);
            //        int val = dataService.AddLog(log);
            //        if (val < 1)
            //        {
            //            LogManager.GetLogger(Assembly.GetExecutingAssembly().GetName().Name).Info("忽略系统日志:" + JsonConvert.SerializeObject(log));
            //        }
            //    }
            //    catch (Exception ex)
            //    {
            //        LogManager.GetLogger(Assembly.GetExecutingAssembly().GetName().Name).Error(ex);
            //    }
            //}
    
            /// <summary>
            /// 关闭 消息队列监听
            /// </summary>
            /// <param name="routKey"></param>
            public static void UnRegister(string routKey)
            {
                IModel chanel = null;
                try
                {
                    if (!Channels.ContainsKey(routKey) && !Tags.ContainsKey(routKey))
                    {
                        return;
                    }
                    chanel = Channels[routKey];
                    var tag = Tags[routKey];
                    chanel.BasicCancel(tag);
                    Channels.Remove(routKey);
                    Tags.Remove(routKey);
                }
                finally
                {
                    chanel?.Close();
                }
            }
        }

    调用:

    //读取配置文件

    public string RoutKey = ConfigurationManager.AppSettings["MQ_Info"];

    //接收MQ消息

     MqService.RegisterConsume(RoutKey, OnReceived, false);

    private void OnReceived(object sender, BasicDeliverEventArgs e)
    {
    try
    {

    //把消息进行格式转换
    var model = MessageUtility.ReadBytes<Model>(e.Body);

    //确认
    MqService.Ack(e, RoutKey);
    }
    catch (Exception ex)
    {
    }
    }

  • 相关阅读:
    操作系统学习笔记:银行家算法的回顾和训练
    操作系统学习笔记:内存学习随笔
    操作系统笔记:内存的连续管理
    操作系统笔记:内存的离散管理
    操作系统:内存管理复习ing之页面置换算法
    马原学习日记1:实践
    bootstrap简单教程
    css-6(媒体识别)
    css-5(弹性盒子)
    css-3(旋转+过渡)
  • 原文地址:https://www.cnblogs.com/nayilvyangguang/p/13224127.html
Copyright © 2011-2022 走看看