zoukankan      html  css  js  c++  java
  • ActiveMQ C#实例

    重点参考:NMS Documentation

    一、ActiveMQ Queue

    在ActiveMQ中Queue是一种点对点的消息分发方式,生产者在队列中添加一条消息,然后消费者消费一条消息,这条消息保证送达并且只会被一个消费者接收

    生产者

      class ActiveMQSend
        {
            // Example connection strings:
            //    activemq:tcp://activemqhost:61616    //localhost
            //    stomp:tcp://activemqhost:61613
            //    ems:tcp://tibcohost:7222
            //    msmq://localhost
    
            string activemqIP = ConfigurationManager.AppSettings["activemqIP"];
            string SendQueue = ConfigurationManager.AppSettings["SendQueue"];
            HyExcel.ExcelHelper helper = new HyExcel.ExcelHelper();
    
            /// <summary>
            /// 发送消息到队列
            /// </summary>
            public void Send(string sendmessage)
            {
                try
                {
                    Uri connecturi = new Uri(activemqIP);
    
                    // NOTE: ensure the nmsprovider-activemq.config file exists in the executable folder.
                    IConnectionFactory factory = new NMSConnectionFactory(connecturi);
                    using (IConnection connection = factory.CreateConnection())
                    using (ISession session = connection.CreateSession())
                    {
                        //Defaults to queue if type is not specified:
                        IDestination sendDestination = SessionUtil.GetDestination(session, SendQueue);  //发送目的地
    
                        // Create a consumer and producer
                        using (IMessageProducer producer = session.CreateProducer(sendDestination))
                        {
                            // Start the connection so that messages will be processed.
                            connection.ExceptionListener += Connection_ExceptionListener; ;
                            connection.Start();
    
                            producer.DeliveryMode = MsgDeliveryMode.Persistent; //消息持久化(到本地文件),消费者可以随时取到数据,而未持久化的发送数据在activemq服务重启之后数据是会清掉的。
    
                            // Send a message
                            ITextMessage request = session.CreateTextMessage(sendmessage);
                            request.NMSCorrelationID = "abc";
                            request.Properties["NMSXGroupID"] = "cheese";
                            request.Properties["myHeader"] = "Cheddar";
                            producer.Send(request);
    
    
                            string fullPath = Environment.CurrentDirectory + "\" + DateTime.Now.ToString("yyyyMMddhhmmss") + ".txt";
                            helper.WriteFile(fullPath, "云端同步数据:" + sendmessage);
                        }
                    }
                }
                catch (Exception ex)
                {
                    string fullPath = Environment.CurrentDirectory + "\" + DateTime.Now.ToString("yyyyMMddhhmmss") + ".txt";
                    helper.WriteFile(fullPath, ex.Message);
                }
            }
    
            private void Connection_ExceptionListener(Exception exception)
            {
                Console.WriteLine("生产者发生异常:{0}", exception);
            }
        }
    View Code

    消费者

    class ActiveMQReceive
        {
            public static void CloudSyncThread(string threadName)
            {
                Console.WriteLine(threadName + "开始工作!");
                IConnection connection = null;
                try
                {
                    string activemqIP = ConfigurationManager.AppSettings["cloudConsumerip"];
                    Uri connecturi = new Uri(activemqIP);
                    // NOTE: ensure the nmsprovider-activemq.config file exists in the executable folder.
                    //IConnectionFactory factory = new NMSConnectionFactory(connecturi);
                    IConnectionFactory factory = new NetTxConnectionFactory(connecturi); //可以支持failover:tcp://
                    connection = factory.CreateConnection();
                    ISession session = connection.CreateSession();
                    //接收源地址
                    IDestination receiveSource = SessionUtil.GetDestination(session, "CLOUND_SYNC");
                    // Create a consumer and producer
                    using (IMessageConsumer consumer = session.CreateConsumer(receiveSource))
                    {
                        // Start the connection so that messages will be processed.
    
                        connection.Start();
                        while (true)
                        {
                            //消费者一直在,若队列没有消息,会阻塞,不返回
                            ITextMessage revMessage = consumer.Receive() as ITextMessage;
    
                            //超过这个时间,队列里没有消息,则会返回null
                            //ITextMessage revMessage = consumer.Receive(new TimeSpan(0, 0, 30)) as ITextMessage; 
    
                            if (revMessage == null)
                            {
                                Console.WriteLine("接收数据" + "No message received!");
                                continue;
                                //break;
                            }
                            else
                            {
                                Console.WriteLine("Received message:" + revMessage.NMSCorrelationID + "+" + revMessage.Text);
                                Console.WriteLine(revMessage.Text);
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
                finally
                {
                    if (connection != null)
                    {
                        connection.Close();
                    }
                }
            }
    
    
        }
    View Code

    以上是消费者的同步消费数据,也可以改为异步

    public class TestMain
    {
        protected static AutoResetEvent semaphore = new AutoResetEvent(false);
        protected static ITextMessage message = null;
        protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(10);
    
        public static void Main(string[] args)
        {
            // Example connection strings:
            //    activemq:tcp://activemqhost:61616
            //    stomp:tcp://activemqhost:61613
            //    ems:tcp://tibcohost:7222
            //    msmq://localhost
    
            Uri connecturi = new Uri("activemq:tcp://activemqhost:61616");
    
            Console.WriteLine("About to connect to " + connecturi);
    
            // NOTE: ensure the nmsprovider-activemq.config file exists in the executable folder.
            IConnectionFactory factory = new NMSConnectionFactory(connecturi);
    
            using(IConnection connection = factory.CreateConnection())
            using(ISession session = connection.CreateSession())
            {
               
                IDestination destination = SessionUtil.GetDestination(session, "queue://FOO.BAR");
    
                Console.WriteLine("Using destination: " + destination);
    
                // Create a consumer and producer
                using(IMessageConsumer consumer = session.CreateConsumer(destination))
                using(IMessageProducer producer = session.CreateProducer(destination))
                {
                    // Start the connection so that messages will be processed.
                    connection.Start();
            producer.DeliveryMode = MsgDeliveryMode.Persistent;
                    producer.RequestTimeout = receiveTimeout;
    
                    consumer.Listener += new MessageListener(OnMessage);
    
                    // Send a message
                    ITextMessage request = session.CreateTextMessage("Hello World!");
                    request.NMSCorrelationID = "abc";
                    request.Properties["NMSXGroupID"] = "cheese";
                    request.Properties["myHeader"] = "Cheddar";
    
                    producer.Send(request);
    
                    // Wait for the message
                    semaphore.WaitOne((int) receiveTimeout.TotalMilliseconds, true);
    
                    if(message == null)
                    {
                        Console.WriteLine("No message received!");
                    }
                    else
                    {
                        Console.WriteLine("Received message with ID:   " + message.NMSMessageId);
                        Console.WriteLine("Received message with text: " + message.Text);
                    }
                }
            }
        }
    
        protected static void OnMessage(IMessage receivedMsg)
        {
            message = receivedMsg as ITextMessage;
            semaphore.Set();
        }
    }
    View Code

    二、ActiveMQ Topic

    Topic和Queue类似,不过生产者发送的消息会被多个消费者接收,保证每个订阅的消费者都会接收到消息。

    管理平台可以看到每条Topic消息有两个记录值,一个是订阅的消费者数量,一个是已经接收的消费者数量。

    发布者

     class ActiveMQPub
        {
            /// <summary>
            /// 发布消息
            /// </summary>
            public static void Pub()
            {
                try
                {
                    //Create the Connection Factory  
                    IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                    using (IConnection connection = factory.CreateConnection())
                    {
                        //Create the Session  
                        using (ISession session = connection.CreateSession())
                        {
                            IDestination sendDestination = SessionUtil.GetTopic(session, "testingTopic");  //发送目的地
                            //Create the Producer for the topic/queue  
                            IMessageProducer prod = session.CreateProducer(sendDestination);
    
                            //Send Messages  
                            int i = 0;
                            while (!Console.KeyAvailable)
                            {
                                ITextMessage msg = prod.CreateTextMessage();
                                msg.Text = i.ToString();
                                Console.WriteLine("Sending: " + i.ToString());
                                prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);
    
                                System.Threading.Thread.Sleep(5000);
                                i++;
                            }
                        }
                    }
                    Console.ReadLine();
                }
                catch (System.Exception e)
                {
                    Console.WriteLine("{0}", e.Message);
                    Console.ReadLine();
                }
            }
        }
    View Code

    订阅者

    class ActiveMQSub
        {
            /// <summary>
            /// 订阅者。可以启动多个实例,从启动时开始接受消息的,之前发布者发布的的消息是获取不到的。
            /// </summary>
            public static void Sub()
            {
                try
                {
                    //Create the Connection factory  
                    IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
    
                    //Create the connection  
                    using (IConnection connection = factory.CreateConnection())
                    {
                        connection.ClientId = "testing time=" + DateTime.Now.ToString(); //区分不同的客户端(订阅者)
                        connection.Start();
    
                        //Create the Session  
                        using (ISession session = connection.CreateSession())
                        {
                            IDestination sendDestination = SessionUtil.GetTopic(session, "testingTopic");  //发送目的地
                            //Create the Consumer  
                            IMessageConsumer consumer = session.CreateConsumer(sendDestination);
    
                            consumer.Listener += new MessageListener(consumer_Listener);
    
                            Console.ReadLine();
                        }
                        connection.Stop();
                        connection.Close();
                    }
                }
                catch (System.Exception e)
                {
                    Console.WriteLine(e.Message);
                }
            }
    
            static void consumer_Listener(IMessage message)
            {
                try
                {
                    ITextMessage msg = (ITextMessage)message;
                    Console.WriteLine("Receive: " + msg.Text);
                }
                catch (System.Exception e)
                {
                    Console.WriteLine(e.Message);
                }
            }
    
        }
    View Code

    三、消息的持久性

    为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息传输一般都会采用持久化机制。

    ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的:

    就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试。

    消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去

    持久化机制

    ActiveMQ默认采用持久化到文件机制(kahaDB),只要在发消息时设置消息为持久化就可以了。

    打开安装目录下的配置文件:

    D:ActiveMQapache-activemqconfactivemq.xml在约80行会发现默认的配置项:

    <persistenceAdapter>
         <kahaDB directory="${activemq.data}/kahadb"/>
    </persistenceAdapter>

    注意这里使用的是kahaDB,是一个基于文件支持事务的消息存储器,是一个可靠,高性能,可扩展的消息存储器

    他的设计初衷就是使用简单并尽可能的快。KahaDB的索引使用一个transaction log,并且所有的destination只使用一个index,有人测试表明:如果用于生产环境,支持1万个active connection,每个connection有一个独立的queue。该表现已经足矣应付大部分的需求。

    然后再发送消息的时候改变第二个参数为:

    MsgDeliveryMode.Persistent

    Message保存方式有2种
    0:PERSISTENT:【默认就是持久化的】保存到磁盘,consumer消费之后,message被删除。
    1:NON_PERSISTENT:保存到内存,消费之后message被清除。
    注意:堆积的消息太多可能导致内存溢出。

    更多参考:ActiveMQ的消息持久化机制

    使用Queue时

    使用queue,即队列时,每个消息只有一个消费者,所以,持久化很简单,只要保存到数据库即可然后,随便一个消费者取走处理即可。某个消费者关掉一阵子,也无所谓。

    producer.DeliveryMode = MsgDeliveryMode.Persistent;  //不过这个是默认的,不设置也可以

    使用Topic时

    使用topic,即订阅时,每个消息可以有多个消费者,就麻烦一些。

    对于topic的消息,有两种订阅类型:Durable Subscribers 和 NonDurable Subscribers

    一般的订阅,订阅者必须时刻处于活跃状态,才不会遗漏任何信息;持久性订阅,当订阅者处于非活动状态时,代理会为它们保留信息,下一次连接之后推送给它们。

    特点:

    • 持久订阅者和非持久订阅者针对的Domain是Pub/Sub,而不是P2P
    • 当Broker发送消息给订阅者时,如果订阅者处于 inactive (离线)状态:持久订阅者可以收到消息,而非持久订阅者则收不到消息。

    当持久订阅者处于 inactive 状态时,Broker需要为持久订阅者保存消息;会造成的影响是:如果持久订阅者订阅的消息太多则会溢出。(当消息投递成功之后,Broker就可以把消息删除了)

    持久性订阅需要:
    1. 为Connection指定一个唯一的ClientID
      - 在这里,Connection有客户端的含义
      - ClientID的变化,将被视为不同的客户端
    2. 创建Subscriber时,指定一个name
      - name的变化,将被视为不同的订阅者

    订阅端代码:

    //Create the Connection factory  
                    IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
    
                    //Create the connection  
                    using (IConnection connection = factory.CreateConnection())
                    {
                        string clientName = "testing time";
                        connection.ClientId = clientName; //区分不同的客户端(订阅者)。持久订阅需要设置这个。
                        connection.Start();
    
                        //Create the Session  
                        using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
                        {
                            //IDestination sendDestination = SessionUtil.GetTopic(session, "testingTopic");  //发送目的地
                            //Create the Consumer  
                            //IMessageConsumer consumer = session.CreateConsumer(sendDestination); //普通订阅
    
                            ITopic topic = SessionUtil.GetTopic(session, "testingTopic");  //发送目的地
                            IMessageConsumer consumer = session.CreateDurableConsumer(topic, clientName,null,false); //持久化订阅
    
                            consumer.Listener += new MessageListener(consumer_Listener);
    
                            Console.ReadLine();
                        }
                        connection.Stop();
                        connection.Close();
                    }
    View Code

    CreateDurableConsumer 创建持久化订阅者,根据ClientId 去区分不同的客户端。

    关闭订阅者,再从新开启,之前的信息同样可以获取到。

    注意:使用相同的“clientID”,则认为是同一个消费者。两个程序使用相同的“clientID”,则同时只能有一个连接到activemq,第二个连接的会报错

    参考:如何实现ActiveMq的Topic的持久订阅 

    四、ActiveMQ-类型转换问题

    protected static void OnMessage(IMessage receivedMsg)
            {
                Console.WriteLine(receivedMsg);
                message = receivedMsg as ITextMessage;
                semaphore.Set();
            }

    receivedMsg是有值的,然而转成ItextMessage后,message为null了。

    调试发现receivedMsg的类型不是ItextMessage ,而是ActiveMQBytesMessage。

  • 相关阅读:
    sql优化的几种方法
    webService使用
    Springboot 项目pom.xml 配置文件以及启动类的一些配置
    WebSocket消息推送(群发和指定到个人)
    有n级台阶,每次爬1或者2级台阶,用程序计算有多少种爬法
    对两个字符串进行比较,取出两个字符串中一样部分的长度
    java 日志 log4j
    进制转换
    java 调用第三方动态链接库
    如何使用 OpenFileDialog 组件
  • 原文地址:https://www.cnblogs.com/peterYong/p/11162548.html
Copyright © 2011-2022 走看看