zoukankan      html  css  js  c++  java
  • ActiveMQ在C#中的应用

    ActiveMQ是个好东东,不必多说。ActiveMQ提供多种语言支持,如 Java, C, C++, C#, Ruby, Perl, Python, PHP等。由于我在windows下开发GUI,比较关心C++和C#,其中C#的ActiveMQ很简单,Apache提供NMS(.Net Messaging Service)支持.Net开发,只需如下几个步骤即能建立简单的实现。C++的应用相对麻烦些,稍后写文章介绍。

    1、去ActiveMQ官方网站下载最新版的ActiveMQ,网址:http://activemq.apache.org/download.html。我之前下的是5.3.1,5.3.2现在也已经出来了。

    2、去ActiveMQ官方网站下载最新版的Apache.NMS,网址:http://activemq.apache.org/nms/download.html, 需要下载Apache.NMS和Apache.NMS.ActiveMQ两个bin包,如果对源码感兴趣,也可下载src包。这里要提醒一下,如果下载 1.2.0版本的NMS.ActiveMQ,Apache.NMS.ActiveMQ.dll在实际使用中有个bug,即停止ActiveMQ应用时会抛 WaitOne函数异常,查看src包中的源码发现是由于Apache.NMS.ActiveMQ-1.2.0-src\src\main\csharp \Transport\InactivityMonitor.cs中的如下代码造成的,修改一下源码重新编译即可。看了一下最新版1.3.0已经修复了这 个bug,因此下载最新版即可。

    view plaincopy to clipboardprint?
    private void StopMonitorThreads()  
            {  
                lock(monitor)  
                {  
                    if(monitorStarted.CompareAndSet(true, false))  
                    {  
                        AutoResetEvent shutdownEvent = new AutoResetEvent(false);  

                        // Attempt to wait for the Timers to shutdown, but don't wait  
                        // forever, if they don't shutdown after two seconds, just quit.  
                        this.readCheckTimer.Dispose(shutdownEvent);  
                        shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));  
                        this.writeCheckTimer.Dispose(shutdownEvent);  
                        shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));  
                          
                                                        //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext)  
                          
                        this.asyncTasks.Shutdown();  
                        this.asyncTasks = null;  
                        this.asyncWriteTask = null;  
                        this.asyncErrorTask = null;  
                    }  
                }  
            }
    private void StopMonitorThreads()
            {
                lock(monitor)
                {
                    if(monitorStarted.CompareAndSet(true, false))
                    {
                        AutoResetEvent shutdownEvent = new AutoResetEvent(false);

                        // Attempt to wait for the Timers to shutdown, but don't wait
                        // forever, if they don't shutdown after two seconds, just quit.
                        this.readCheckTimer.Dispose(shutdownEvent);
                        shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));
                        this.writeCheckTimer.Dispose(shutdownEvent);
                        shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));
                       
                          //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext)
         
                        this.asyncTasks.Shutdown();
                        this.asyncTasks = null;
                        this.asyncWriteTask = null;
                        this.asyncErrorTask = null;
                    }
                }
            }

    3、运行ActiveMQ,找到ActiveMQ解压后的bin文件夹:...\apache-activemq-5.3.1\bin,执行activemq.bat批处理文件即可启动ActiveMQ服务器,默认端口为61616,这可在配置文件中修改。

    4、写C#程序实现ActiveMQ的简单应用。新建C#工程(一个Producter项目和一个Consumer项目),WinForm或 Console程序均可,这里建的是Console工程,添加对Apache.NMS.dll和Apache.NMS.ActiveMQ.dll的引用, 然后即可编写实现代码了,简单的Producer和Consumer实现代码如下:

    producer:

    view plaincopy to clipboardprint?
    using System;  
    using System.Collections.Generic;  
    using System.Text;  
    using Apache.NMS;  
    using Apache.NMS.ActiveMQ;  
    using System.IO;  
    using System.Xml.Serialization;  
    using System.Runtime.Serialization.Formatters.Binary;  

    namespace Publish  
    {  
        class Program  
        {  
            static void Main(string[] args)  
            {  
                try
                {  
                    //Create the Connection Factory  
                    IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");  
                    using (IConnection connection = factory.CreateConnection())  
                    {  
                        //Create the Session  
                        using (ISession session = connection.CreateSession())  
                        {  
                            //Create the Producer for the topic/queue  
                            IMessageProducer prod = session.CreateProducer(  
                                new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));  
                              
                            //Send Messages  
                            int i = 0;  

                            while (!Console.KeyAvailable)  
                            {  
                                ITextMessage msg = prod.CreateTextMessage();  
                                msg.Text = i.ToString();  
                                Console.WriteLine("Sending: " + i.ToString());  
                                prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);  

                                System.Threading.Thread.Sleep(5000);  
                                i++;  
                            }  
                        }  
                    }  

                    Console.ReadLine();  
               }  
                catch (System.Exception e)  
                {  
                    Console.WriteLine("{0}",e.Message);  
                    Console.ReadLine();  
                }  
            }  
        }  
    }
    using System;
    using System.Collections.Generic;
    using System.Text;
    using Apache.NMS;
    using Apache.NMS.ActiveMQ;
    using System.IO;
    using System.Xml.Serialization;
    using System.Runtime.Serialization.Formatters.Binary;

    namespace Publish
    {
        class Program
        {
            static void Main(string[] args)
            {
                try
                {
                    //Create the Connection Factory
                    IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                    using (IConnection connection = factory.CreateConnection())
                    {
                        //Create the Session
                        using (ISession session = connection.CreateSession())
                        {
                            //Create the Producer for the topic/queue
                            IMessageProducer prod = session.CreateProducer(
                                new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));
                           
                            //Send Messages
                            int i = 0;

                            while (!Console.KeyAvailable)
                            {
                                ITextMessage msg = prod.CreateTextMessage();
                                msg.Text = i.ToString();
                                Console.WriteLine("Sending: " + i.ToString());
                                prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);

                                System.Threading.Thread.Sleep(5000);
                                i++;
                            }
                        }
                    }

                    Console.ReadLine();
               }
                catch (System.Exception e)
                {
                    Console.WriteLine("{0}",e.Message);
                    Console.ReadLine();
                }
            }
        }
    }

    consumer:

    view plaincopy to clipboardprint?
    using System;  
    using System.Collections.Generic;  
    using System.Text;  
    using Apache.NMS;  
    using Apache.NMS.ActiveMQ;  
    using System.IO;  
    using System.Xml.Serialization;  
    using System.Runtime.Serialization.Formatters.Binary;  

    namespace Subscribe  
    {  
        class Program  
        {  
            static void Main(string[] args)  
            {  
                try
                {  
                    //Create the Connection factory  
                    IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");  
                      
                    //Create the connection  
                    using (IConnection connection = factory.CreateConnection())  
                    {  
                        connection.ClientId = "testing listener";  
                        connection.Start();  

                        //Create the Session  
                        using (ISession session = connection.CreateSession())  
                        {  
                            //Create the Consumer  
                            IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener", null, false);  
                              
                            consumer.Listener += new MessageListener(consumer_Listener);  

                            Console.ReadLine();  
                        }  
                        connection.Stop();  
                        connection.Close();  
                    }  
                }  
                catch (System.Exception e)  
                {  
                    Console.WriteLine(e.Message);  
                }  
            }  

            static void consumer_Listener(IMessage message)  
            {  
                try
                {  
                    ITextMessage msg = (ITextMessage)message;  
                    Console.WriteLine("Receive: " + msg.Text);  
               }  
                catch (System.Exception e)  
                {  
                    Console.WriteLine(e.Message);  
                }  
            }  
        }  
    }
    using System;
    using System.Collections.Generic;
    using System.Text;
    using Apache.NMS;
    using Apache.NMS.ActiveMQ;
    using System.IO;
    using System.Xml.Serialization;
    using System.Runtime.Serialization.Formatters.Binary;

    namespace Subscribe
    {
        class Program
        {
            static void Main(string[] args)
            {
                try
                {
                    //Create the Connection factory
                    IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                   
                    //Create the connection
                    using (IConnection connection = factory.CreateConnection())
                    {
                        connection.ClientId = "testing listener";
                        connection.Start();

                        //Create the Session
                        using (ISession session = connection.CreateSession())
                        {
                            //Create the Consumer
                            IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener", null, false);
                           
                            consumer.Listener += new MessageListener(consumer_Listener);

                            Console.ReadLine();
                        }
                        connection.Stop();
                        connection.Close();
                    }
                }
                catch (System.Exception e)
                {
                    Console.WriteLine(e.Message);
                }
            }

            static void consumer_Listener(IMessage message)
            {
                try
                {
                    ITextMessage msg = (ITextMessage)message;
                    Console.WriteLine("Receive: " + msg.Text);
               }
                catch (System.Exception e)
                {
                    Console.WriteLine(e.Message);
                }
            }
        }
    }

    程序实现的功能:生产者producer建立名为testing的主题,并每隔5秒向该主题发送消息,消费者consumer订阅了testing 主题,因此只要生产者发送testing主题的消息到ActiveMQ服务器,服务器就将该消息发送给订阅了testing主题的消费者。

    编译生成producer.exe和consumer.exe,并执行两个exe,即可看到消息的发送与接收了。

    这个例子是建的主题(Topic),ActiveMQ还支持另一种方式:Queue,即P2P,两者有什么区别呢?区别在于,Topic是广播,即 如果某个Topic被多个消费者订阅,那么只要有消息到达服务器,服务器就将该消息发给全部的消费者;而Queue是点到点,即一个消息只能发给一个消费 者,如果某个Queue被多个消费者订阅,没有特殊情况的话消息会一个一个地轮流发给不同的消费者,比如:

    msg1-->consumer A


    msg2-->consumer B

    msg3-->consumer C

    msg4-->consumer A

    msg5-->consumer B

    msg6-->consumer C

    特殊情况是指:ActiveMQ支持过滤机制,即生产者可以设置消息的属性(Properties),该属性与消费者端的Selector对应,只 有消费者设置的selector与消息的Properties匹配,消息才会发给该消费者。Topic和Queue都支持Selector。

    Properties和Selector该如何设置呢?请看如下代码:

    producer:


    view plaincopy to clipboardprint?
    ITextMessage msg = prod.CreateTextMessage();  
                                msg.Text = i.ToString();  
                                msg.Properties.SetString("myFilter", "test1");  
                                Console.WriteLine("Sending: " + i.ToString());  
                                prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);
    ITextMessage msg = prod.CreateTextMessage();
                                msg.Text = i.ToString();
                                msg.Properties.SetString("myFilter", "test1");
                                Console.WriteLine("Sending: " + i.ToString());
                                prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);

    consumer:

    view plaincopy to clipboardprint?
    //生成consumer时通过参数设置Selector  
    IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'");
    //生成consumer时通过参数设置Selector
    IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'");

    本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/bodybo/archive/2010/06/04/5647968.aspx

  • 相关阅读:
    Anagram
    HDU 1205 吃糖果(鸽巢原理)
    Codeforces 1243D 0-1 MST(补图的连通图数量)
    Codeforces 1243C Tile Painting(素数)
    Codeforces 1243B2 Character Swap (Hard Version)
    Codeforces 1243B1 Character Swap (Easy Version)
    Codeforces 1243A Maximum Square
    Codeforces 1272E Nearest Opposite Parity(BFS)
    Codeforces 1272D Remove One Element
    Codeforces 1272C Yet Another Broken Keyboard
  • 原文地址:https://www.cnblogs.com/wpcnblog/p/2096989.html
Copyright © 2011-2022 走看看