zoukankan      html  css  js  c++  java
  • ActiveMQ发布订阅模式 转发 https://www.cnblogs.com/madyina/p/4127144.html

    ActiveMQ的另一种模式就SUB/HUB即发布订阅模式,是SUB/hub就是一拖N的USB分线器的意思。意思就是一个来源分到N个出口。还是上节的例子,当一个订单产生后,后台N个系统需要联动,但有一个前提是都需要收到订单信息,那么我们就需要将一个生产者的消息发布到N个消费者。

    生产者:

    复制代码
                try
                {
                    //Create the Connection Factory  
                    IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                    using (IConnection connection = factory.CreateConnection())
                    {
                        //Create the Session  
                        using (ISession session = connection.CreateSession())
                        {
                            //Create the Producer for the topic/queue  
                            IMessageProducer prod = session.CreateProducer(
                                new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));
    
                            //Send Messages  
                            int i = 0;
    
                            while (!Console.KeyAvailable)
                            {
                                ITextMessage msg = prod.CreateTextMessage();
                                msg.Text = i.ToString();
                                Console.WriteLine("Sending: " + i.ToString());
                                prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);
    
                                System.Threading.Thread.Sleep(5000);
                                i++;
                            }
                        }
                    }
    
                    Console.ReadLine();
                }
                catch (System.Exception e)
                {
                    Console.WriteLine("{0}", e.Message);
                    Console.ReadLine();
                }
    复制代码

    假设生产者每5秒发送一次消息:

    wps3E59.tmp

    消费者:

    复制代码
            static void Main(string[] args)
            {
                try  
                {  
                    //Create the Connection factory  
                    IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");  
                      
                    //Create the connection  
                    using (IConnection connection = factory.CreateConnection())  
                    {  
                        connection.ClientId = "testing listener1";  
                        connection.Start();  
      
                        //Create the Session  
                        using (ISession session = connection.CreateSession())  
                        {  
                            //Create the Consumer  
                            IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener1", null, false);  
                              
                            consumer.Listener += new MessageListener(consumer_Listener);  
      
                            Console.ReadLine();  
                        }  
                        connection.Stop();  
                        connection.Close();  
                    }  
                }  
                catch (System.Exception e)  
                {  
                    Console.WriteLine(e.Message);  
                }  
            }  
      
            static void consumer_Listener(IMessage message)  
            {  
                try  
                {  
                    ITextMessage msg = (ITextMessage)message;  
                    Console.WriteLine("Receive: " + msg.Text);  
               }  
                catch (System.Exception e)  
                {  
                    Console.WriteLine(e.Message);  
                }  
            }
    复制代码

    启动一个消费者:

    wps3E5A.tmp

    我们发现他是从15开始的,而不是像上节一样从头开始,再启动另一个消费者:

    wps3E5B.tmp

    我们发现就是从启动时开始接受消息的,之前的消息就丢失了。

    整体状态如下:

    wps3E6B.tmp

    我们观察管理界面:

    wps3E6C.tmp

    产生了一个testing的Topics,而订阅方有2个都订阅的是testing:

    wps3E6D.tmp

    这样只需要在需要获取消息的地方订阅即可及时获得。

    源代码下载

  • 相关阅读:
    SpringBoot_web开发-【实验】-员工列表-公共页抽取
    下载 Linux 内核的脚本
    uboot 修改代码 增加 环境变量
    Linux FHS
    Redis源码分析(二十五)--- zmalloc内存分配实现
    Redis源码分析(二十四)--- tool工具类(2)
    Redis源码分析(二十四)--- tool工具类(2)
    数据结构(二)——排序
    数据结构(一)——树
    python(三)——while语句
  • 原文地址:https://www.cnblogs.com/Jeely/p/10784835.html
Copyright © 2011-2022 走看看