例子中使用的消息类型是:MapMessage。代码比较简单,帖出来给大家看看:
接收消息:
private void Receive() { var factory = new ConnectionFactory(Program.BrokerUri); var timeout = new TimeSpan(0, 0, 10); using (var connection = factory.CreateConnection()) { using (var session = connection.CreateSession()) { var destination = SessionUtil.GetDestination(session, Program.NormalQueueDestination); using (var consumer = session.CreateConsumer(destination)) { connection.Start(); var stopwatch = new Stopwatch(); var index = 1; for (var i = 0; i < Program.MessageCount; i++) { if (index == 1) { stopwatch.Start(); } try { var message = (IMapMessage)consumer.Receive(timeout); index++; var messageObj = Common.GetMessageObjByIMapMessage(message); if (messageObj != null) { //Debug.WriteLine(messageObj.MediaTaskId); } if (index == Program.StatisticsMessageCountSpan) { stopwatch.Stop(); var spendSeconds = stopwatch.Elapsed.TotalSeconds; var speed = Program.StatisticsMessageCountSpan / spendSeconds; Debug.WriteLine("Receive " + Program.StatisticsMessageCountSpan + " Messages Spend:" + spendSeconds + " Seconds. (" + speed.ToString("0.00") + "/s)"); stopwatch.Reset(); index = 1; } } catch (Exception ex) { Debug.WriteLine(ex); } } } } } }
发送消息:
private void Send() { var factory = new ConnectionFactory(Program.BrokerUri); using (var connection = factory.CreateConnection()) { using (var session = connection.CreateSession()) { var destination = SessionUtil.GetDestination(session, Program.NormalQueueDestination); using (var producer = session.CreateProducer(destination)) { connection.Start(); var mediaTaskId = 100000000000000; var stopwatch = new Stopwatch(); var index = 1; for (var i = 1; i <= Program.MessageCount; i++) { if (index == 1) { stopwatch.Start(); } try { mediaTaskId++; var message = session.CreateMapMessage(); Common.SetMapMessage(message, Common.GetMessageObj(mediaTaskId.ToString())); producer.Send(message); index++; if (index == Program.StatisticsMessageCountSpan) { stopwatch.Stop(); var spendSeconds = stopwatch.Elapsed.TotalSeconds; var speed = Program.StatisticsMessageCountSpan / spendSeconds; Debug.WriteLine("Send " + Program.StatisticsMessageCountSpan + " Messages Spend:" + spendSeconds + " Seconds. (" + speed.ToString("0.00") + "/s)"); stopwatch.Reset(); index = 1; } } catch (Exception ex) { Debug.WriteLine(ex); } } } } } MessageBox.Show(@"Send Done!"); }
简单监听封装类:
using System; using System.Diagnostics; using System.Threading; using Apache.NMS; namespace ActiveMQ.PerformanceTest { public class ActiveMqListener { public event MessageReceivedEventHandler MessageReceived; private readonly IMessageConsumer _messageConsumer; private readonly TimeSpan _timeout; private readonly int _sleepMinutes; private bool _listen; public ActiveMqListener(IMessageConsumer messageConsumer, TimeSpan timeout, int sleepMinutes) { _messageConsumer = messageConsumer; _timeout = timeout; _sleepMinutes = sleepMinutes; } public void Start() { _listen = true; StartListening(); } public void Stop() { _listen = false; } private void StartListening() { while (_listen) { var message = _messageConsumer.Receive(_timeout); if (message == null) { Debug.WriteLine("[%Notice:ActiveMqListener Start " + _sleepMinutes + " Minutes Sleep.%]"); Thread.Sleep(_sleepMinutes * 1000 * 60); } else { FireRecieveEvent(message); } } } private void FireRecieveEvent(object message) { if (MessageReceived != null) { MessageReceived(this, new MessageEventArgs(message)); } } } public delegate void MessageReceivedEventHandler(object sender, MessageEventArgs args); public class MessageEventArgs : EventArgs { private readonly object _message; public object Message { get { return _message; } } public MessageEventArgs(object message) { _message = message; } } }
完整代码下载地址:
https://files.cnblogs.com/CopyPaster/ActiveMQ.PerformanceTest.rar