zoukankan      html  css  js  c++  java
  • ActiveMq C#客户端 消息队列的使用(存和取)

    1、准备工具

     

    2、开始项目

     
    VS2013新建一个C#控制台应用程序,项目中添加两个dll引用,一个是D:Apache.NMS.ActiveMQ-1.7.2-binlibApache.NMS et-4.0目录下的Apache.NMS.dll,另一个是D:Apache.NMS.ActiveMQ-1.7.2-binuild et-4.0debug目录下的Apache.NMS.ActiveMQ.dll。
    新建一个类,MyActiveMq.cs,用于对activemq消息队列接口的封装,实现如下:
    [csharp] view plain copy
     
    1. using System;  
    2. using System.Collections.Generic;  
    3. using System.Linq;  
    4. using System.Text;  
    5. using System.Threading.Tasks;  
    6.   
    7. using Apache.NMS;  
    8. using Apache.NMS.ActiveMQ;  
    9.   
    10. namespace NmsProducerClasses  
    11. {  
    12.     public class MyActiveMq  
    13.     {  
    14.         private IConnectionFactory factory;  
    15.         private IConnection connection;  
    16.         private ISession session;  
    17.         private IMessageProducer prod;  
    18.         private IMessageConsumer consumer;  
    19.         private ITextMessage msg;  
    20.   
    21.         private bool isTopic = false;  
    22.         private bool hasSelector = false;  
    23.         private const string ClientID = "clientid";  
    24.         private const string Selector = "filter='demo'";  
    25.         private bool sendSuccess = true;  
    26.         private bool receiveSuccess = true;  
    27.   
    28.         public MyActiveMq(bool isLocalMachine, string remoteAddress)  
    29.         {  
    30.             try  
    31.             {  
    32.                 //初始化工厂     
    33.                 if (isLocalMachine)  
    34.                 {  
    35.                     factory = new ConnectionFactory("tcp://localhost:61616/");  
    36.                 }  
    37.                 else  
    38.                 {  
    39.                     factory = new ConnectionFactory("tcp://" + remoteAddress + ":61616/"); //写tcp://192.168.1.111:61616的形式连接其他服务器上的ActiveMQ服务器             
    40.                 }  
    41.                 //通过工厂建立连接  
    42.                 connection = factory.CreateConnection();  
    43.                 connection.ClientId = ClientID;  
    44.                 connection.Start();  
    45.                 //通过连接创建Session会话  
    46.                 session = connection.CreateSession();  
    47.             }  
    48.             catch (System.Exception e)  
    49.             {  
    50.                 sendSuccess = false;  
    51.                 receiveSuccess = false;  
    52.                 Console.WriteLine("Exception:{0}", e.Message);  
    53.                 Console.ReadLine();  
    54.                 throw e;  
    55.             }  
    56.             Console.WriteLine("Begin connection...");  
    57.         }  
    58.   
    59.   
    60.         ~MyActiveMq()  
    61.         {  
    62.             //this.ShutDown();  
    63.         }  
    64.   
    65.         /// <summary>  
    66.         /// 初始化  
    67.         /// </summary>  
    68.         /// <param name="topic">选择是否是Topic</param>  
    69.         /// <param name="name">队列名</param>  
    70.         /// <param name="selector">是否设置过滤</param>  
    71.         public bool InitQueueOrTopic(bool topic, string name, bool selector = false)  
    72.         {  
    73.             try  
    74.             {  
    75.                 //通过会话创建生产者、消费者  
    76.                 if (topic)  
    77.                 {  
    78.                     prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name));  
    79.                     if (selector)  
    80.                     {  
    81.                         consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name), ClientID, Selector, false);  
    82.                         hasSelector = true;  
    83.                     }  
    84.                     else  
    85.                     {  
    86.                         consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name), ClientID, null, false);  
    87.                         hasSelector = false;  
    88.                     }  
    89.                     isTopic = true;  
    90.                 }  
    91.                 else  
    92.                 {  
    93.                     prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));  
    94.                     if (selector)  
    95.                     {  
    96.                         consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name), Selector);  
    97.                         hasSelector = true;  
    98.                     }  
    99.                     else  
    100.                     {  
    101.                         consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));  
    102.                         hasSelector = false;  
    103.                     }  
    104.                     isTopic = false;  
    105.                 }  
    106.                 //创建一个发送的消息对象  
    107.                 msg = prod.CreateTextMessage();  
    108.             }  
    109.             catch (System.Exception e)  
    110.             {  
    111.                 sendSuccess = false;  
    112.                 receiveSuccess = false;  
    113.                 Console.WriteLine("Exception:{0}", e.Message);  
    114.                 Console.ReadLine();  
    115.                 throw e;  
    116.             }  
    117.   
    118.             return sendSuccess;  
    119.         }  
    120.   
    121.   
    122.         public bool SendMessage(string message, string msgId = "defult", MsgPriority priority = MsgPriority.Normal)  
    123.         {  
    124.             if (prod == null)  
    125.             {  
    126.                 sendSuccess = false;  
    127.                 Console.WriteLine("call InitQueueOrTopic() first!!");  
    128.                 return false;  
    129.             }  
    130.   
    131.             Console.WriteLine("Begin send messages...");  
    132.   
    133.             //给这个对象赋实际的消息  
    134.             msg.NMSCorrelationID = msgId;  
    135.             msg.Properties["MyID"] = msgId;  
    136.             msg.NMSMessageId = msgId;  
    137.             msg.Text = message;  
    138.             Console.WriteLine(message);  
    139.   
    140.             if (isTopic)  
    141.             {  
    142.                 sendSuccess = ProducerSubcriber(message, priority);  
    143.             }  
    144.             else  
    145.             {  
    146.                 sendSuccess = P2P(message, priority);  
    147.             }  
    148.   
    149.             return sendSuccess;  
    150.         }  
    151.   
    152.   
    153.         public string GetMessage()  
    154.         {  
    155.             if (prod == null)  
    156.             {  
    157.                 Console.WriteLine("call InitQueueOrTopic() first!!");  
    158.                 return null;  
    159.             }  
    160.   
    161.             Console.WriteLine("Begin receive messages...");  
    162.             ITextMessage revMessage = null;  
    163.             try  
    164.             {  
    165.                 //同步阻塞10ms,没消息就直接返回null,注意此处时间不能设太短,否则还没取到消息就直接返回null了!!!  
    166.                 revMessage = consumer.Receive(new TimeSpan(TimeSpan.TicksPerMillisecond *10)) as ITextMessage;   
    167.             }  
    168.             catch (System.Exception e)  
    169.             {  
    170.                 receiveSuccess = false;  
    171.                 Console.WriteLine("Exception:{0}", e.Message);  
    172.                 Console.ReadLine();  
    173.                 throw e;  
    174.             }  
    175.   
    176.             if (revMessage == null)  
    177.             {  
    178.                 Console.WriteLine("No message received!");  
    179.                 return null;  
    180.             }  
    181.             else  
    182.             {  
    183.                 Console.WriteLine("Received message with Correlation ID: " + revMessage.NMSCorrelationID);  
    184.                 //Console.WriteLine("Received message with Properties'ID: " + revMessage.Properties["MyID"]);  
    185.                 Console.WriteLine("Received message with text: " + revMessage.Text);  
    186.             }  
    187.   
    188.             return revMessage.Text;  
    189.         }  
    190.   
    191.         //P2P模式,一个生产者对应一个消费者  
    192.         private bool P2P(string message, MsgPriority priority)  
    193.         {  
    194.             try  
    195.             {  
    196.                 if (hasSelector)  
    197.                 {  
    198.                     //设置消息对象的属性,这个很重要,是Queue的过滤条件,也是P2P消息的唯一指定属性  
    199.                     msg.Properties.SetString("filter", "demo");  //P2P模式  
    200.                 }  
    201.                 prod.Priority = priority;  
    202.                 //设置持久化  
    203.                 prod.DeliveryMode = MsgDeliveryMode.Persistent;  
    204.                 //生产者把消息发送出去,几个枚举参数MsgDeliveryMode是否持久化,MsgPriority消息优先级别,存活时间,当然还有其他重载  
    205.                 prod.Send(msg, MsgDeliveryMode.Persistent, priority, TimeSpan.MinValue);  
    206.             }  
    207.             catch (System.Exception e)  
    208.             {  
    209.                 sendSuccess = false;  
    210.                 Console.WriteLine("Exception:{0}", e.Message);  
    211.                 Console.ReadLine();  
    212.                 throw e;  
    213.             }  
    214.   
    215.             return sendSuccess;  
    216.         }  
    217.   
    218.   
    219.         //发布订阅模式,一个生产者多个消费者   
    220.         private bool ProducerSubcriber(string message, MsgPriority priority)  
    221.         {  
    222.             try  
    223.             {  
    224.                 prod.Priority = priority;  
    225.                 //设置持久化,如果DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失  
    226.                 prod.DeliveryMode = MsgDeliveryMode.Persistent;  
    227.                 prod.Send(msg, Apache.NMS.MsgDeliveryMode.Persistent, priority, TimeSpan.MinValue);  
    228.                 //System.Threading.Thread.Sleep(1000);    
    229.             }  
    230.             catch (System.Exception e)  
    231.             {  
    232.                 sendSuccess = false;  
    233.                 Console.WriteLine("Exception:{0}", e.Message);  
    234.                 Console.ReadLine();  
    235.                 throw e;  
    236.             }  
    237.   
    238.             return sendSuccess;  
    239.         }  
    240.   
    241.   
    242.         public void ShutDown()  
    243.         {  
    244.             Console.WriteLine("Close connection and session...");  
    245.             session.Close();  
    246.             connection.Close();  
    247.         }  
    248.     }  
    249. }  


    Program.cs代码如下:
     
    [csharp] view plain copy
     
    1. using System;  
    2. using System.Collections.Generic;  
    3. using System.Linq;  
    4. using System.Text;  
    5. using System.Threading.Tasks;  
    6. using System.IO;  
    7. using System.Threading;  
    8.   
    9. namespace NmsProducerClasses  
    10. {  
    11.     class Program  
    12.     {  
    13.         static void Main(string[] args)  
    14.         {  
    15.             MyActiveMq mymq = new MyActiveMq(isLocalMachine: true, remoteAddress: "");  
    16.   
    17.             mymq.InitQueueOrTopic(topic: false, name: "myqueue", selector: false);  
    18.             //mymq.InitQueueOrTopic(topic: false, name: "seletorqueue", selector: true);   
    19.             //mymq.InitQueueOrTopic(topic: true, name: "noselectortopic", selector: false);  
    20.             //mymq.InitQueueOrTopic(topic: true, name: "selectortopic", selector: true);  
    21.   
    22.             //The full range of priority values (0-9) are supported by the JDBC message store. For KahaDB three priority categories are supported, Low (< 4), Default (= 4) and High (> 4).  
    23.             User myuser0 = new User("0000", "Lowest", "img/p.jpg");  
    24.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser0), "newid", priority: Apache.NMS.MsgPriority.Lowest);  
    25.             User myuser1 = new User("1111", "AboveLow", "img/p.jpg");  
    26.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser1), "newid", priority: Apache.NMS.MsgPriority.AboveLow);  
    27.             User myuser2 = new User("2222", "AboveNormal", "img/p.jpg");  
    28.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser2), "newid", priority: Apache.NMS.MsgPriority.AboveNormal);  
    29.             User myuser3 = new User("0000", "BelowNormal", "img/p.jpg");  
    30.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser3), "newid", priority: Apache.NMS.MsgPriority.BelowNormal);  
    31.             User myuser4 = new User("1111", "High", "img/p.jpg");  
    32.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser4), "newid", priority: Apache.NMS.MsgPriority.High);  
    33.             User myuser5 = new User("2222", "Highest", "img/p.jpg");  
    34.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser5), "newid", priority: Apache.NMS.MsgPriority.Highest);  
    35.             User myuser6 = new User("0000", "Low", "img/p.jpg");  
    36.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser6), "newid", priority: Apache.NMS.MsgPriority.Low);  
    37.             User myuser7 = new User("1111", "Normal", "img/p.jpg");  
    38.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser7), "newid", priority: Apache.NMS.MsgPriority.Normal);  
    39.             User myuser8 = new User("2222", "VeryHigh", "img/p.jpg");  
    40.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser8), "newid", priority: Apache.NMS.MsgPriority.VeryHigh);  
    41.             User myuser9 = new User("2222", "VeryLow", "img/p.jpg");  
    42.             mymq.SendMessage(JsonUtil.ObjectToJson(myuser8), "newid", priority: Apache.NMS.MsgPriority.VeryLow);  
    43.   
    44.             int num = 20;  
    45.             while (num-- > 0)  
    46.             {  
    47.                 mymq.GetMessage();  
    48.                 //Thread.Sleep(1000);  
    49.             }  
    50.             mymq.ShutDown();  
    51.               
    52.   
    53.             //XML测试  
    54.             //string xml = XmlTest.ObjToXml();  
    55.             //Console.WriteLine("ObjToXml: {0}", xml);  
    56.   
    57.             //Json测试  
    58.             //User u = new User() { Id="88", Imgurl="img/88.jpg", Name="haha88"};  
    59.             //string jsonstr = JsonUtil.ObjectToJson(u);  
    60.             //Console.WriteLine(jsonstr);  
    61.               
    62.         }  
    63.   
    64.     }  

    3、测试

    首先,需要启动消息队列,具体启动及测试消息队列步骤可见这边:点击打开链接
    然后,运行项目,运行结果如下:
     
     
     
     

    4、优先级

    priority并不能决定消息传送的严格消息,具体原因可见

    优先级设置:

    在D:apache-activemq-5.14.0conf目录的activemq.xml配置文件中,找到<destinationPolicy>标签,在其中的<policyEntries>标签下添加
    [html] view plain copy
     
    1. <policyEntry queue=">"  producerFlowControl="false" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1" />    
    2. <policyEntry queue=">" strictOrderDispatch="false" />    
    3. <policyEntry queue=">" >    
    4.               <pendingMessageLimitStrategy>    
    5.                   <constantPendingMessageLimitStrategy limit="0"/>    
    6.               </pendingMessageLimitStrategy>    
    7.               <messageEvictionStrategy>    
    8.                   <oldestMessageWithLowestPriorityEvictionStrategy/>    
    9.               </messageEvictionStrategy>    
    10. </policyEntry>    
     
    配置完成后,需要重启activemq
     
     

    5、远程登录监控

    要实现远程监控服务器消息队列,需要先进行配置。
    配置方法:在D:apache-activemq-5.14.0conf目录的jetty.xml配置文件中,把133开始的那段注释去掉即可。
     
  • 相关阅读:
    verilog中timescale
    [shell] if语句用法
    makefile编写
    linux下压缩解压缩命令
    python获取文件所在目录
    gvim 技巧
    vcs编译verilog/sysverilog并执行
    verilog中signed的使用
    [leetcode]_String to Integer (atoi)
    [leetcode]_Minimum Depth of Binary Tree
  • 原文地址:https://www.cnblogs.com/zxtceq/p/8558740.html
Copyright © 2011-2022 走看看