zoukankan      html  css  js  c++  java
  • [MQ]ActiveMQ消息收发简单例子

    例子中使用的消息类型是:MapMessage。代码比较简单,帖出来给大家看看:

    接收消息:

    private void Receive()
            {
                var factory = new ConnectionFactory(Program.BrokerUri);
                var timeout = new TimeSpan(0, 0, 10);
                using (var connection = factory.CreateConnection())
                {
                    using (var session = connection.CreateSession())
                    {
                        var destination = SessionUtil.GetDestination(session, Program.NormalQueueDestination);
                        using (var consumer = session.CreateConsumer(destination))
                        {
                            connection.Start();
                            var stopwatch = new Stopwatch();
                            var index = 1;
                            for (var i = 0; i < Program.MessageCount; i++)
                            {
                                if (index == 1)
                                {
                                    stopwatch.Start();
                                }
    
                                try
                                {
                                    var message = (IMapMessage)consumer.Receive(timeout);
                                    index++;
                                    var messageObj = Common.GetMessageObjByIMapMessage(message);
                                    if (messageObj != null)
                                    {
                                        //Debug.WriteLine(messageObj.MediaTaskId);
                                    }
    
                                    if (index == Program.StatisticsMessageCountSpan)
                                    {
                                        stopwatch.Stop();
                                        var spendSeconds = stopwatch.Elapsed.TotalSeconds;
                                        var speed = Program.StatisticsMessageCountSpan / spendSeconds;
                                        Debug.WriteLine("Receive " + Program.StatisticsMessageCountSpan + " Messages Spend:" + spendSeconds + " Seconds. (" + speed.ToString("0.00") + "/s)");
    
                                        stopwatch.Reset();
                                        index = 1;
                                    }
                                }
                                catch (Exception ex)
                                {
                                    Debug.WriteLine(ex);
                                }
                            }
                        }
                    }
                }
            }
    

     发送消息:

    private void Send()
            {
                var factory = new ConnectionFactory(Program.BrokerUri);
                using (var connection = factory.CreateConnection())
                {
                    using (var session = connection.CreateSession())
                    {
                        var destination = SessionUtil.GetDestination(session, Program.NormalQueueDestination);
                        using (var producer = session.CreateProducer(destination))
                        {
                            connection.Start();
                            var mediaTaskId = 100000000000000;
                            var stopwatch = new Stopwatch();
                            var index = 1;
                            for (var i = 1; i <= Program.MessageCount; i++)
                            {
                                if (index == 1)
                                {
                                    stopwatch.Start();
                                }
    
                                try
                                {
                                    mediaTaskId++;
                                    var message = session.CreateMapMessage();
                                    Common.SetMapMessage(message, Common.GetMessageObj(mediaTaskId.ToString()));
                                    producer.Send(message);
                                    index++;
                                    
                                    if (index == Program.StatisticsMessageCountSpan)
                                    {
                                        stopwatch.Stop();
                                        var spendSeconds = stopwatch.Elapsed.TotalSeconds;
                                        var speed = Program.StatisticsMessageCountSpan / spendSeconds;
                                        Debug.WriteLine("Send " + Program.StatisticsMessageCountSpan + " Messages Spend:" + spendSeconds + " Seconds. (" + speed.ToString("0.00") + "/s)");
    
                                        stopwatch.Reset();
                                        index = 1;
                                    }
                                }
                                catch (Exception ex)
                                {
                                    Debug.WriteLine(ex);
                                }
                            }
                        }
                    }
                }
    
                MessageBox.Show(@"Send Done!");
            }

    简单监听封装类:

    using System;
    using System.Diagnostics;
    using System.Threading;
    using Apache.NMS;
    
    namespace ActiveMQ.PerformanceTest
    {
        public class ActiveMqListener
        {
            public event MessageReceivedEventHandler MessageReceived;
            private readonly IMessageConsumer _messageConsumer;
            private readonly TimeSpan _timeout;
            private readonly int _sleepMinutes;
            private bool _listen;
    
            public ActiveMqListener(IMessageConsumer messageConsumer, TimeSpan timeout, int sleepMinutes)
            {
                _messageConsumer = messageConsumer;
                _timeout = timeout;
                _sleepMinutes = sleepMinutes;
            }
    
            public void Start()
            {
                _listen = true;
    
                StartListening();
            }
    
            public void Stop()
            {
                _listen = false;
            }
    
            private void StartListening()
            {
                while (_listen)
                {
                    var message = _messageConsumer.Receive(_timeout);
                    if (message == null)
                    {
                        Debug.WriteLine("[%Notice:ActiveMqListener Start " + _sleepMinutes + " Minutes Sleep.%]");
                        Thread.Sleep(_sleepMinutes * 1000 * 60);
                    }
                    else
                    {
                        FireRecieveEvent(message);
                    }
                }
            }
    
            private void FireRecieveEvent(object message)
            {
                if (MessageReceived != null)
                {
                    MessageReceived(this, new MessageEventArgs(message));
                }
            }
        }
    
        public delegate void MessageReceivedEventHandler(object sender, MessageEventArgs args);
    
        public class MessageEventArgs : EventArgs
        {
            private readonly object _message;
    
            public object Message
            {
                get { return _message; }
            }
    
            public MessageEventArgs(object message)
            {
                _message = message;
            }
        }
    }
    

     完整代码下载地址:

    https://files.cnblogs.com/CopyPaster/ActiveMQ.PerformanceTest.rar

  • 相关阅读:
    《经济学通识》六、生命有限
    《经济学通识》五、反垄断的罪与罚
    《经济学通识》四、贸易与互惠
    《经济学通识》三、价格与市场
    《经济学通识》二、管制的愿望与结果
    《经济学通识》一、前言
    《必然》十三、开始,正从脚下开始
    《必然》十二、一个好问题足以改变世界
    字符串之strstr
    STL之内存处理工具
  • 原文地址:https://www.cnblogs.com/CopyPaster/p/2473205.html
Copyright © 2011-2022 走看看