zoukankan      html  css  js  c++  java
  • [MQ]ActiveMQ消息收发简单例子

    例子中使用的消息类型是:MapMessage。代码比较简单,帖出来给大家看看:

    接收消息:

    private void Receive()
            {
                var factory = new ConnectionFactory(Program.BrokerUri);
                var timeout = new TimeSpan(0, 0, 10);
                using (var connection = factory.CreateConnection())
                {
                    using (var session = connection.CreateSession())
                    {
                        var destination = SessionUtil.GetDestination(session, Program.NormalQueueDestination);
                        using (var consumer = session.CreateConsumer(destination))
                        {
                            connection.Start();
                            var stopwatch = new Stopwatch();
                            var index = 1;
                            for (var i = 0; i < Program.MessageCount; i++)
                            {
                                if (index == 1)
                                {
                                    stopwatch.Start();
                                }
    
                                try
                                {
                                    var message = (IMapMessage)consumer.Receive(timeout);
                                    index++;
                                    var messageObj = Common.GetMessageObjByIMapMessage(message);
                                    if (messageObj != null)
                                    {
                                        //Debug.WriteLine(messageObj.MediaTaskId);
                                    }
    
                                    if (index == Program.StatisticsMessageCountSpan)
                                    {
                                        stopwatch.Stop();
                                        var spendSeconds = stopwatch.Elapsed.TotalSeconds;
                                        var speed = Program.StatisticsMessageCountSpan / spendSeconds;
                                        Debug.WriteLine("Receive " + Program.StatisticsMessageCountSpan + " Messages Spend:" + spendSeconds + " Seconds. (" + speed.ToString("0.00") + "/s)");
    
                                        stopwatch.Reset();
                                        index = 1;
                                    }
                                }
                                catch (Exception ex)
                                {
                                    Debug.WriteLine(ex);
                                }
                            }
                        }
                    }
                }
            }
    

     发送消息:

    private void Send()
            {
                var factory = new ConnectionFactory(Program.BrokerUri);
                using (var connection = factory.CreateConnection())
                {
                    using (var session = connection.CreateSession())
                    {
                        var destination = SessionUtil.GetDestination(session, Program.NormalQueueDestination);
                        using (var producer = session.CreateProducer(destination))
                        {
                            connection.Start();
                            var mediaTaskId = 100000000000000;
                            var stopwatch = new Stopwatch();
                            var index = 1;
                            for (var i = 1; i <= Program.MessageCount; i++)
                            {
                                if (index == 1)
                                {
                                    stopwatch.Start();
                                }
    
                                try
                                {
                                    mediaTaskId++;
                                    var message = session.CreateMapMessage();
                                    Common.SetMapMessage(message, Common.GetMessageObj(mediaTaskId.ToString()));
                                    producer.Send(message);
                                    index++;
                                    
                                    if (index == Program.StatisticsMessageCountSpan)
                                    {
                                        stopwatch.Stop();
                                        var spendSeconds = stopwatch.Elapsed.TotalSeconds;
                                        var speed = Program.StatisticsMessageCountSpan / spendSeconds;
                                        Debug.WriteLine("Send " + Program.StatisticsMessageCountSpan + " Messages Spend:" + spendSeconds + " Seconds. (" + speed.ToString("0.00") + "/s)");
    
                                        stopwatch.Reset();
                                        index = 1;
                                    }
                                }
                                catch (Exception ex)
                                {
                                    Debug.WriteLine(ex);
                                }
                            }
                        }
                    }
                }
    
                MessageBox.Show(@"Send Done!");
            }

    简单监听封装类:

    using System;
    using System.Diagnostics;
    using System.Threading;
    using Apache.NMS;
    
    namespace ActiveMQ.PerformanceTest
    {
        public class ActiveMqListener
        {
            public event MessageReceivedEventHandler MessageReceived;
            private readonly IMessageConsumer _messageConsumer;
            private readonly TimeSpan _timeout;
            private readonly int _sleepMinutes;
            private bool _listen;
    
            public ActiveMqListener(IMessageConsumer messageConsumer, TimeSpan timeout, int sleepMinutes)
            {
                _messageConsumer = messageConsumer;
                _timeout = timeout;
                _sleepMinutes = sleepMinutes;
            }
    
            public void Start()
            {
                _listen = true;
    
                StartListening();
            }
    
            public void Stop()
            {
                _listen = false;
            }
    
            private void StartListening()
            {
                while (_listen)
                {
                    var message = _messageConsumer.Receive(_timeout);
                    if (message == null)
                    {
                        Debug.WriteLine("[%Notice:ActiveMqListener Start " + _sleepMinutes + " Minutes Sleep.%]");
                        Thread.Sleep(_sleepMinutes * 1000 * 60);
                    }
                    else
                    {
                        FireRecieveEvent(message);
                    }
                }
            }
    
            private void FireRecieveEvent(object message)
            {
                if (MessageReceived != null)
                {
                    MessageReceived(this, new MessageEventArgs(message));
                }
            }
        }
    
        public delegate void MessageReceivedEventHandler(object sender, MessageEventArgs args);
    
        public class MessageEventArgs : EventArgs
        {
            private readonly object _message;
    
            public object Message
            {
                get { return _message; }
            }
    
            public MessageEventArgs(object message)
            {
                _message = message;
            }
        }
    }
    

     完整代码下载地址:

    https://files.cnblogs.com/CopyPaster/ActiveMQ.PerformanceTest.rar

  • 相关阅读:
    UVa532 Dungeon Master 三维迷宫
    6.4.2 走迷宫
    UVA 439 Knight Moves
    UVa784 Maze Exploration
    UVa657 The die is cast
    UVa572 Oil Deposits DFS求连通块
    UVa10562 Undraw the Trees
    UVa839 Not so Mobile
    327
    UVa699 The Falling Leaves
  • 原文地址:https://www.cnblogs.com/CopyPaster/p/2473205.html
Copyright © 2011-2022 走看看