zoukankan      html  css  js  c++  java
  • MQ单一消息完整流程

    public class QueueManger
        {
            private static string QueuePath = @".private${0}";
                
            /// <summary>
            /// 创建MSMQ队列
            /// </summary>
            /// <param name="queueName">队列路径</param>
            /// <param name="transactional">是否事务队列</param>
            public static void Createqueue(string queueName, bool transactional = false)
            {
                try
                {
                    QueuePath = string.Format(QueuePath, queueName);
                    //判断队列是否存在
                    if (!MessageQueue.Exists(QueuePath))
                    {
                        MessageQueue.Create(QueuePath);
                        LoggerFile.Write(QueuePath + "已成功创建!"); 
                    }
                    else
                    {
                        LoggerFile.Write(QueuePath + "已经存在!"); 
                    }
                }
                catch (MessageQueueException e)
                {
                    LoggerFile.Write(e.Message); 
                }
            }
            /// <summary>
            /// 删除队列
            /// </summary>
            /// <param name="queueName"></param>
            public static void Deletequeue(string queueName)
            {
                try
                {
                    QueuePath = string.Format(QueuePath, queueName);
                    //判断队列是否存在
                    if (MessageQueue.Exists(QueuePath))
                    {
                        MessageQueue.Delete(QueuePath);
                        LoggerFile.Write(QueuePath + "已删除!");
                    }
                    else
                    {
                        LoggerFile.Write(QueuePath + "不存在!");
                    }
                }
                catch (MessageQueueException e)
                {
                    LoggerFile.Write(e.Message);
                }
            }
            /// <summary>
            /// 发送消息
            /// </summary>
            /// <typeparam name="T">用户数据类型</typeparam>
            /// <param name="target">用户数据</param>
            /// <param name="queueName">队列名称</param>
            /// <param name="tran"></param>
            /// <returns></returns>
            public static bool SendMessage<T>(T target, string queueName, MessageQueueTransaction tran = null)
            {
                try
                {
                    QueuePath = string.Format(QueuePath, queueName);
                    //连接到本地的队列
                    MessageQueue myQueue = new MessageQueue(QueuePath);
                    System.Messaging.Message myMessage = new System.Messaging.Message();
                    myMessage.Body = target;
                    myMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                    //发送消息到队列中
                    if (tran == null)
                    {
                        myQueue.Send(myMessage);
                    }
                    else
                    {
                        myQueue.Send(myMessage, tran);
                    }
                    LoggerFile.Write("消息已成功发送到" + queueName + "队列!");
                    return true;
                }
                catch (ArgumentException e)
                {
                    LoggerFile.Write(e.Message);
                    return false;
                }
            }
            /// <summary>
            /// 接收消息
            /// </summary>
            /// <typeparam name="T">用户的数据类型</typeparam>
            /// <param name="queueName">消息路径</param>
            /// <returns>用户填充在消息当中的数据</returns>
            public static T ReceiveMessage<T>(string queueName, MessageQueueTransaction tran = null)
            {
                QueuePath = string.Format(QueuePath, queueName);
                //连接到本地队列
                MessageQueue myQueue = new MessageQueue(QueuePath);
                myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                try
                {
                    //从队列中接收消息
                    System.Messaging.Message myMessage = tran == null ? myQueue.Receive() : myQueue.Receive(tran);
                    return (T)myMessage.Body; //获取消息的内容
                }
                catch (MessageQueueException e)
                {
                    LoggerFile.Write(e.Message);
                }
                catch (InvalidCastException e)
                {
                    LoggerFile.Write(e.Message);
                }
                return default(T);
            }
            /// <summary>
            /// 采用Peek方法接收消息
            /// </summary>
            /// <typeparam name="T">用户数据类型</typeparam>
            /// <param name="queueName">队列路径</param>
            /// <returns>用户数据</returns>
            public static T ReceiveMessageByPeek<T>(string queueName)
            {
                QueuePath = string.Format(QueuePath, queueName);
                //连接到本地队列
                MessageQueue myQueue = new MessageQueue(QueuePath);
                myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                try
                {
                    //从队列中接收消息
                    System.Messaging.Message myMessage = myQueue.Peek();
                    return (T)myMessage.Body; //获取消息的内容
                }
                catch (MessageQueueException e)
                {
                    LoggerFile.Write(e.Message);
                }
                catch (InvalidCastException e)
                {
                    LoggerFile.Write(e.Message);
                }
                return default(T);
            }
            /// <summary>
            /// 获取队列中的所有消息
            /// </summary>
            /// <typeparam name="T">用户数据类型</typeparam>
            /// <param name="queueName">队列路径</param>
            /// <returns>用户数据集合</returns>
            public static List<T> GetAllMessage<T>(string queueName)
            {
                QueuePath = string.Format(QueuePath, queueName);
                MessageQueue myQueue = new MessageQueue(QueuePath);
                myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                try
                {
                    Message[] msgArr = myQueue.GetAllMessages();
                    List<T> list = new List<T>();
                    msgArr.ToList().ForEach((o) =>
                    {
                        list.Add((T)o.Body);
                    });
                    return list;
                }
                catch (Exception e)
                {
                    LoggerFile.Write(e.Message);
                }
                return null;
            }
        }
    

      

  • 相关阅读:
    20170803 Airflow自带的API进行GET 和POST动作部分内容
    20170731 培训Bootstrap
    20170728 Celery项目 后台处理SQL SERVER的一个异常
    python 之 递归
    编译型语言和解释型语言的区别
    如何在命令行中让python2和python3同存
    bzoj 1579: [Usaco2009 Feb]Revamping Trails 道路升级——分层图+dijkstra
    单调栈题目总结
    汕头市队赛SRM15
    codevs 1269 匈牙利游戏——次短路(spfa)
  • 原文地址:https://www.cnblogs.com/Agui520/p/8041843.html
Copyright © 2011-2022 走看看