接上篇的MQ配置。利用C#实现MQ消息的收发。源码
1.需要引入的dll是amqmdnet.dll
2.app.config配置
<?xml version="1.0" encoding="utf-8" ?> <configuration> <appSettings > <add key="HostName" value ="192.168.1.40"/> <add key="Channel" value ="CLIENT.QM_ORANGE"/> <add key ="Port" value ="1418"/> <add key ="QueueManager" value="QM_APPLE"/> <add key="Queue" value="Q1"/> </appSettings> <connectionStrings> <add name ="connectionString" connectionString ="Data Source=(local);Initial Catalog=TestDb; Integrated Security=SSPI" /> </connectionStrings> </configuration>
3.MQ操作类
using System; using System.Data; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using IBM.WMQ; using System.Xml; using System.Configuration; using System.Windows.Forms; namespace WindowsFormsApplicationMQ { class Management { string queueName; MQQueueManager qMgr; MQMessage mqMsg; MQQueue queue; MQPutMessageOptions putOptions; #region 连接队列管理器 public Management() { } string linkStatus; public string LinkToQueueManager() { string QueueName = ConfigurationSettings.AppSettings["Queue"]; queueName = QueueName; Environment.SetEnvironmentVariable("MQCCSID", "1381"); if (MQEnvironment.properties.Count <= 0) { MQEnvironment.properties.Add(MQC.CCSID_PROPERTY, 1381); } MQEnvironment.Port = Convert.ToInt32(ConfigurationSettings.AppSettings["Port"]); MQEnvironment.Channel = ConfigurationSettings.AppSettings["Channel"]; MQEnvironment.Hostname = ConfigurationSettings.AppSettings["HostName"]; string qmName = ConfigurationSettings.AppSettings["QueueManager"]; try { if (qMgr == null || !qMgr.IsConnected) { qMgr = new MQQueueManager(qmName); } linkStatus = "连接队列管理器:" + "成功!"; } catch (MQException e) { linkStatus = "连接队列管理器错误: 结束码:" + e.CompletionCode + " 错误原因代码:" + e.ReasonCode; } catch (Exception e) { linkStatus = "连接队列管理器错误: 结束码:" + e; } return linkStatus; } #endregion #region 发送消息 public void SendMsg(string message) { int openOptions=MQC.MQOO_OUTPUT | MQC.MQOO_INPUT_SHARED | MQC.MQOO_INQUIRE; try { queue = qMgr.AccessQueue(queueName, openOptions); //尝试打开队列 } catch(MQException e) { MessageBox.Show("打开队列失败:"+e.Message); } mqMsg = new MQMessage(); mqMsg.WriteString(message); putOptions = new MQPutMessageOptions(); try { queue.Put(mqMsg, putOptions); //将消息放入消息队列 } catch (MQException mqe) { MessageBox.Show("发送异常终止:"+mqe .Message ); } finally { try { qMgr.Disconnect(); } catch (MQException e) { } } } #endregion #region 接收消息 public DataSet receiveMsg() { int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_INPUT_SHARED | MQC.MQOO_INQUIRE; try { queue = qMgr.AccessQueue(queueName, openOptions); //尝试打开队列 } catch (MQException e) { MessageBox.Show("打开队列失败:" + e.Message); } //从队列管理器中获得消息 MQGetMessageOptions mqGetMsgOpts; mqMsg = new MQMessage(); mqGetMsgOpts = new MQGetMessageOptions(); mqGetMsgOpts.WaitInterval = 15000; mqGetMsgOpts.Options |= MQC.MQGMO_WAIT; try { int queryDep = queue.CurrentDepth; if (queryDep > 0) { queue.Get(mqMsg, mqGetMsgOpts); //获得消息 var ds = new DataSet(); var table = new DataTable("T_School"); table.Columns.Add("ID", typeof(string)); table.Columns.Add("SchoolName", typeof(string)); table.Columns.Add("BuildDate", typeof(string)); table.Columns.Add("Address", typeof(string)); ds.Tables.Add(table); string message = mqMsg.ReadString(mqMsg.MessageLength); mqMsg.Format = MQC.MQFMT_XMIT_Q_HEADER; var reader = new StringReader(message); ds.ReadXml(reader, XmlReadMode.Fragment); return ds; } else { return null; } } catch(MQException ex) { MessageBox.Show("访问队列停止" + ex.InnerException); return null; } finally { try { qMgr.Disconnect(); } catch (MQException e) { } } } #endregion } }