zoukankan      html  css  js  c++  java
  • 消息队列应用

    正文

    简介

            它是一种异步传输模式,可以在不同的应用之间实现相互通信,相互通信的应用可以分布在同一台机器上,也可以分布于相连的网络空间中的任一位置。

    它的实现原理是:消息的发送者把自己想要发送的信息放入一个Message中,然后把它保存至一个系统公用空间的消息队列(Message Queue)中;本地或者是异

    地的消息接收程序再从该队列中取出发给它的消息进行处理。如图所示:

        

    优缺点与使用场景

    • 优缺点

      优点:支持离线通讯;有消息优先级;有保障的消息传递和执行许多业务处理的可靠的防故障机制;息传递机制使得消息通信的双方具有不同的物理平台成为可能。

            缺点:很难满足实时交互需求。

    • 使用场景

            1、数据采集:适合多设备多应用数据采集功能。

            2、辅助实时交互:在大并发系统中,某一个操作涉及到很多步骤,某些步骤是不需要及时处理的,将不需要及时处理的步骤提出来,用消息队列处理。

    比如:在一个高并发购物网站,一个顾客下单,顾客的操作记录、顾客的余额记录、商品相关的统计记录是不需要及时处理的,这些可以放消息队列处理,

    延时处理。

            3、多线程流水线处理:可以应用于生产者和消费者模式里面。比如:批量文件解压+解析+入库处理。为了降低服务器压力,将各个步骤分布在不同的

    服务器上面,每个步骤的结果可以放入消息队列。

    准备工作

    • 环境准备

            在使用MSMQ开发之前,需要安装消息队列。按顺序操作:控制面板-》程序-》打开或关闭Windows功能,勾选MSMQ服务器所有选项,如图所示:

            

    • 存放位置 

           在调试、操作队列的时候,插入和取出队列信息以后,需要查看操作是否成功,就得去存放队列的服务器中查看队列信息。操作顺序:计算机-》管理-》

    服务和应用程序,在里面有相应队列存储的信息,如图所示:

            

    MSMQ的代码封装

    • 基础准备

             文件作用:

       

          MSMQ队列操作的属性设置:

        /// <summary>
        /// MSMQ队列操作的属性设置
        /// </summary>
        public class MessageQueueSettings
        {
            /// <summary>
            /// 专用队列的基本路径
            /// </summary>
            private string basePath = "private$";
    
            /// <summary>
            /// 队列的ID
            /// </summary>
            public Guid MessageQueueId { get; set; }
    
            /// <summary>
            /// 队列的标签
            /// </summary>
            public string MessageQueueLabel { get; set; } = "由程序创建的默认消息队列";
    
            /// <summary>
            /// 访问队列的路径,默认是本机,记做“.”
            /// </summary>
            public string RemotePath { get; set; } = ".";
    
            /// <summary>
            /// 队列名称:默认是 msmqdefault
            /// </summary>
            public string MessageQueueName { get; set; } = ConfigSource.DefaultMSMQName;
    
            /// <summary>
            /// 队列的路径
            /// </summary>
            public string MessageQueuePath => $"{RemotePath}\\{basePath}\\{MessageQueueName}";
    
            /// <summary>
            /// 消息队列类型
            /// </summary>
            public QueueType MessageQueueType { get; set; } = QueueType.PrivateQueue;
    
            /// <summary>
            /// 获取或设置队列日志的大小
            /// </summary>
            public long MaximumJournalSize { get; set; } = uint.MaxValue;
    
            /// <summary>
            /// 是否启用日志
            /// </summary>
            public bool UseJournalQueue { get; set; } = true;
    
            /// <summary>
            /// 获取或设置队列的大小
            /// </summary>
            public long MaximumQueueSize { get; set; } = uint.MaxValue;
    
            /// <summary>
            /// 是否启用队列事务
            /// </summary>
            public bool IsUseTransaction { get; set; } = true;
    
            /// <summary>
            /// 队列的访问方式
            /// </summary>
            public QueueAccessMode AccessMode { get; set; } = QueueAccessMode.SendAndReceive;
    
            /// <summary>
            /// 设置/获取是否经过身份验证
            /// </summary>
            public bool Authenticate { get; set; } = false;
    
            /// <summary>
            /// 获取或设置基优先级,“消息队列”使用该基优先级在网络上传送公共队列的消息。
            /// </summary>
            public short BasePriority { get; set; } = 0;
    
        }
    View Code

        队列类型enum:

        /// <summary>
        /// 队列类型
        /// </summary>
        public enum QueueType
        {
            /// <summary>
            /// 私有队列
            /// </summary>
            PrivateQueue = 0,
    
            /// <summary>
            /// 公共队列
            /// </summary>
            PublicQueue = 1,
    
            /// <summary>
            /// 管理队列
            /// </summary>
            ManageQueue = 2,
    
            /// <summary>
            /// 响应队列
            /// </summary>
            ResponseQueue = 3
        }
    View Code
    • 创建队列

          逻辑代码里面不直接调用队列创建方法,直接用操作基类的构造方法创建。思路:初始化参数实体-》初始化待参数实体的操作基类实例-》实例里面构造方法

    判断是否存在存在该队列,如果存在就返回该队列实例,如果不存在,就创建队列,返回实例。

         基础操作类:

            #region 构造函数
    
            /// <summary>
            /// 默认构造函数:不对外开放
            /// </summary>
            private MessageQueueBase() { }
    
            /// <summary>
            /// 构造函数
            /// </summary>
            /// <param name="setting">消息队列的配置</param>
            public MessageQueueBase(MessageQueueSettings setting)
            {
                try
                {
                    this.msetting = setting;
    
                    // 校验是否存在队列
                    if (msetting.RemotePath == "." && !Exists())
                        this.mqueue = Create();
                    else
                        this.mqueue = new MessageQueue(msetting.MessageQueuePath, msetting.AccessMode);
    
                    if (msetting.RemotePath == ".")
                    {
                        // 设置是否经过身份验证:默认为否
                        this.mqueue.Authenticate = msetting.Authenticate;
    
                        // 是否启用日志队列
                        this.mqueue.UseJournalQueue = msetting.UseJournalQueue;
    
                        // 最大日志队列长度:最大值为uint.MaxValue
                        this.mqueue.MaximumJournalSize =
                            msetting.MaximumJournalSize > uint.MaxValue ?
                            uint.MaxValue :
                            msetting.MaximumJournalSize;
    
                        // 最大队列长度:最大值为uint.MaxValue
                        this.mqueue.MaximumQueueSize = msetting.MaximumQueueSize > uint.MaxValue ? uint.MaxValue : msetting.MaximumJournalSize;
    
                        // 队列标签
                        this.mqueue.Label = msetting.MessageQueueLabel;
                    }
    
                    // 设置基优先级,默认是0
                    this.mqueue.BasePriority = msetting.BasePriority;
    
                    // 获取消息队列的id
                    msetting.MessageQueueId = mqueue.Id;
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
    
           ~MessageQueueBase()
            {
                this.mqueue?.Close();
                GC.SuppressFinalize(this);
            }
            #endregion
    
            #region 操作方法
    
            /// <summary>
            /// 验证队列是否存在
            /// </summary>        
            /// <returns>验证结果</returns>
            public bool Exists()
            {
                var successed = false;
                try
                {
                    successed = MessageQueue.Exists(msetting.MessageQueuePath);
                }
                catch (Exception ex)
                {
                    throw ex;
                }
                return successed;
            }
    
            /// <summary>
            /// 创建队列
            /// </summary>
            /// <param name="type">队列类型(默认是专用队列)</param>
            /// <returns>队列对象</returns>
            public MessageQueue Create()
            {
                MessageQueue queue = default(MessageQueue);
                try
                {
                    queue = MessageQueue.Create(msetting.MessageQueuePath, msetting.IsUseTransaction);
                }
                catch (Exception ex)
                {
                    throw ex;
                }
                return queue;
            }
    
            /// <summary>
            /// 删除队列
            /// </summary>
            public void Delete()
            {
                try
                {
                    MessageQueue.Delete(msetting.MessageQueuePath);
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
    
            #endregion
    View Code

        业务调用类:

            /// <summary>
            /// 创建消息队列
            /// </summary>
            /// <param name="queueName">队列名称</param>
            /// <param name="transaction">是否是事务队列</param>
            /// <returns></returns>
            public bool CreateMessageQueue(string queueName, bool transaction)
            {
                bool result = false;
                MessageQueueSettings mqs = new MessageQueueSettings()
                {
                     MessageQueueName= queueName,
                     IsUseTransaction= transaction
                };
                IMessageQueueBase messageQueueBase = new MessageQueueBase(mqs);
                result=messageQueueBase.Exists();
                return result;
            }
    View Code

       测试结果:

         

         

    • 发送消息

          发送消息分为单条发送和列表发送,是否启用事务,以当前需求和队列属性决定。

          基础操作类:

    #region 发送操作
            /// <summary>
            /// 发送数据到队列
            /// </summary>
            /// <typeparam name="T">发送的数据类型</typeparam>
            /// <param name="t">发送的数据对象</param>
            public void Send<T>(T t)
            {
                MessageQueueTransaction tran = null;
                try
                {
                    tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null;
                    using (Message message = new Message())
                    {
                        tran?.Begin();
                        message.Body = t;
                        message.Label = $"推送时间[{DateTime.Now.ToString("yyyy-MM-dd HH-mm-ss.fff")}]";
                        message.UseDeadLetterQueue = true;
                        message.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                        message.TimeToBeReceived = new TimeSpan(30, 0, 0, 0);
                        message.TimeToReachQueue = new TimeSpan(30, 0, 0, 0);
                        if (tran != null)
                        {
                            mqueue.Send(message, tran);
                        }
                        else
                        {
                            mqueue.Send(message);
                        }
                        tran?.Commit();
                        tran?.Dispose();
                    }
                }
                catch (Exception ex)
                {
                    tran?.Abort();
                    tran?.Dispose();
                    throw ex;
                }
            }
    
            /// <summary>
            /// 发送多条数据
            /// </summary>
            /// <typeparam name="T">发送的数据类型</typeparam>
            /// <param name="ts">发送的数据对象集合</param>
            public void SendList<T>(IList<T> ts)
            {
                MessageQueueTransaction tran = null;
                try
                {
                    tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null;
                    tran?.Begin();
                    foreach (var item in ts)
                    {
                        using (Message message = new Message())
                        {
                            message.Body = item;
                            message.Label = $"推送时间[{DateTime.Now.ToString("yyyy-MM-dd HH-mm-ss.fff")}]";
                            message.UseDeadLetterQueue = true;
                            message.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                            message.TimeToBeReceived = new TimeSpan(30, 0, 0, 0);
                            message.TimeToReachQueue = new TimeSpan(30, 0, 0, 0);
                            if (tran != null)
                            {
                                mqueue.Send(message, tran);
                            }
                            else
                            {
                                mqueue.Send(message);
                            }
                        }
                    }
                    tran?.Commit();
                    tran?.Dispose();
                }
                catch (Exception ex)
                {
                    tran?.Abort();
                    tran?.Dispose();
                    throw ex;
                }
            }
    
            #endregion
    View Code

          业务调用类:

      /// <summary>
            /// 发送消息
            /// </summary>
            /// <param name="queueName">队列名称</param>
            /// <param name="priority">优先级</param>
            /// <param name="isTransaction">是否启用队列事务</param>
            /// <param name="userInfo">新增用户信息</param>
            public void SendMessage(string queueName, short priority, bool isTransaction, user userInfo)
            {
                MessageQueueSettings mqs = new MessageQueueSettings()
                {
                    MessageQueueName = queueName,
                    BasePriority = priority,
                    IsUseTransaction = isTransaction
                };
                IMessageQueueBase messageQueueBase = new MessageQueueBase(mqs);
                messageQueueBase.Send<user>(userInfo);
            }
    View Code

          测试结果:

          

          

    • 接收消息

           消息接收分为同步接收和异步接收。

           基础操作类:

    #region 接收方法
    
            /// <summary>
            /// 同步获取队列消息
            /// </summary>
            /// <typeparam name="T">返回的数据类型</typeparam>
            /// <param name="action">获取数据后的回调</param>
            public void Receive<T>(Action<T> action) where T : class
            {
                try
                {
                    mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                    Message message = mqueue.Receive();
                    T obj = message.Body as T;
                    if (obj != null)
                        action(obj);
                    else
                        throw new InvalidCastException("队列获取的类型与泛型类型不符");
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
    
            /// <summary>
            /// 同步查询但不移除队列第一条数据
            /// </summary>
            /// <typeparam name="T">返回的数据类型</typeparam>
            /// <param name="action">获取数据后的回调</param>
            public void Peek<T>(Action<T> action) where T : class
            {
                try
                {
                    mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                    Message message = mqueue.Peek();
                    T obj = message.Body as T;
                    if (obj != null)
                        action(obj);
                    else
                        throw new InvalidCastException("队列获取的类型与泛型类型不符");
                }
                catch (Exception ex)
                {
    
                    throw ex;
                }
            }
    
            /// <summary>
            /// 获取所有消息
            /// </summary>
            /// <typeparam name="T">数据类型</typeparam>
            /// <returns>取出来的数据</returns>
            public IList<T> GetAll<T>() where T : class
            {
                try
                {
                    mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                    var enumerator = mqueue.GetMessageEnumerator2();
                    var ret = new List<T>();
                    while (enumerator.MoveNext())
                    {
                        var messageInfo = enumerator.Current.Body as T;
                        if (messageInfo != null)
                        {
                            ret.Add(messageInfo);
                        }
                        enumerator.RemoveCurrent();
                        enumerator.Reset();
                    }
                    return ret;
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
    
            /// <summary>
            /// 异步接收队列消息,并删除接收的消息数据
            /// </summary>
            /// <typeparam name="T">返回的数据类型</typeparam>
            /// <param name="action">获取数据后的回调</param>
            public void ReceiveAsync<T>(Action<T> action) where T : class
            {
                MessageQueueTransaction tran = null;
                try
                {
                    tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null;
    
                    if (!initReceiveAsync)
                    {
                        mqueue.ReceiveCompleted += (sender, e) =>
                        {
                            Message message = mqueue.EndReceive(e.AsyncResult);
                            T obj = message.Body as T;
                            if (obj != null)
                                action(obj);
                            else
                                throw new InvalidCastException("队列获取的类型与泛型类型不符");
                        };
                        initReceiveAsync = true;
                    }
                    mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                    tran?.Begin();
                    mqueue.BeginReceive();
                    tran?.Commit();
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
    
            /// <summary>
            /// 启动一个没有超时设定的异步查看操作。直到队列中出现消息时,才完成此操作。
            /// </summary>
            /// <typeparam name="T">获取的消息数据的类型</typeparam>
            /// <param name="action">获取数据后的回调</param>
            public void PeekAsync<T>(Action<T> action) where T : class
            {
                MessageQueueTransaction tran = null;
                try
                {
                    tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null;
                    if (!initPeekAsync)
                    {
                        mqueue.PeekCompleted += (sender, e) =>
                        {
                            Message message = mqueue.EndPeek(e.AsyncResult);
                            T obj = message.Body as T;
                            if (obj != null)
                                action(obj);
                            else
                                throw new InvalidCastException("队列获取的类型与泛型类型不符");
                        };
                        initPeekAsync = true;
                    }
                    mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                    tran?.Begin();
                    mqueue.BeginPeek();
                    tran?.Commit();
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
    
            /// <summary>
            /// 异步循环获取队列消息,调用此方法将一直接收队列数据,直到IsStopReceiveLoop被设置为true
            /// </summary>
            /// <typeparam name="T">返回的数据类型</typeparam>
            /// <param name="action">获取数据后的回调</param>
            public void ReceiveAsyncLoop<T>(Action<T> action) where T : class
            {
                MessageQueueTransaction tran = null;
                try
                {
                    tran = msetting.IsUseTransaction || mqueue.Transactional ? new MessageQueueTransaction() : null;
                    if (!initReceiveAsync)
                    {
                        mqueue.ReceiveCompleted += (sender, e) =>
                        {
                            Message message = mqueue.EndReceive(e.AsyncResult);
                            T obj = message.Body as T;
                            if (obj != null)
                                action(obj);
                            else
                                throw new InvalidCastException("队列获取的类型与泛型类型不符");
    
                            //只要不停止就一直接收
                            if (!IsStopReceiveLoop)
                            {
                                tran?.Begin();
                                mqueue.BeginReceive();
                                tran?.Commit();
                            }
                        };
                        initReceiveAsync = true;
                    }
                    mqueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                    IsStopReceiveLoop = false;
                    tran?.Begin();
                    mqueue.BeginReceive();
                    tran?.Commit();
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
    
            #endregion
    View Code

          业务调用类:

     /// <summary>
            /// 接收消息
            /// </summary>
            /// <param name="queueName">队列</param>
            /// <param name="isAsy">是否异步</param>
            public user ReceiveMessage(string queueName, bool isAsy)
            {
                user userInfo = new user();
                MessageQueueSettings mqs = new MessageQueueSettings()
                {
                    MessageQueueName = queueName
                };
                IMessageQueueBase messageQueueBase = new MessageQueueBase(mqs);
                if (!isAsy)
                {
                    messageQueueBase.Receive<user>(p =>
                    {
                        userInfo = p;
                    });
                }
                else
                {
                    messageQueueBase.ReceiveAsync<user>(p =>
                    {
                        userInfo = p;
                    });
                }
                return userInfo;
            }
    
            /// <summary>
            /// 获取全部
            /// </summary>
            /// <param name="queueName"></param>
            /// <returns></returns>
            public IList<user> GetAll(string queueName)
            {
                MessageQueueSettings mqs = new MessageQueueSettings()
                {
                    MessageQueueName = queueName
                };
                IMessageQueueBase messageQueueBase = new MessageQueueBase(mqs);
                return messageQueueBase.GetAll<user>();
            }
    View Code

          测试结果:

          点击插入数据,在队列中插入N条数据。同步接收和异步接收每次只获取队列第一条数据;异步循环是每次获取一条数据,循环获取,知道队列没有数据;

    接收全部是一次性获取指定队列所有数据。

         

         

    补充

            下一篇写Dapper应用或Redis应用。

  • 相关阅读:
    人脸识别的一些网站
    41、过滤驱动程序
    13、ActiveX控件
    42、驱动程序调试
    20、宽字符串与字符串间的相互转换
    14、HOOK和数据库访问
    43、Windows驱动程序模型笔记(一)
    7、注册表读写的一个例子
    12、动态链接库,dll
    40、总结IRP,handling IRPs,Part II
  • 原文地址:https://www.cnblogs.com/harveybarray/p/6628115.html
Copyright © 2011-2022 走看看