zoukankan      html  css  js  c++  java
  • Active MQ C#实现

    原文链接:

    Active MQ C#实现 

    内容概要

    主要以源码的形式介绍如何用C#实现同Active MQ 的通讯。本文假设你已经正确安装JDK1.6.x,了解Active MQ并有一定的编程基础。

    正文

        JMS 程序的最终目的是生产和消费的消息能被其他程序使用,JMS 的 Message 是一个既简单又不乏灵活性的基本格式,允许创建不同平台上符合非JMS 程序格式的消息。

    Message 由消息头,属性和消息体三部份组成。

        Active MQ支持过滤机制,即生产者可以设置消息的属性(Properties),该属性与消费者端的Selector对应,只有消费者设置的selector与消息的Properties匹配,消息才会发给该消费者。Topic和Queue都支持Selector。

    示例代码

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Windows;
    using System.Windows.Controls;
    using System.Windows.Data;
    using System.Windows.Documents;
    using System.Windows.Input;
    using System.Windows.Media;
    using System.Windows.Media.Imaging;
    using System.Windows.Navigation;
    using System.Windows.Shapes;
    
    using Apache.NMS;
    using System.Diagnostics;
    using Apache.NMS.Util;
    using System.Windows.Threading;
    
    /*
     * 功能描述:C#使用ActiveMQ示例
     * 修改次数:2
     * 最后更新: by Kagula,2012-07-31
     * 
     * 前提条件:
     * [1]apache-activemq-5.4.2
     * [2]Apache.NMS.ActiveMQ-1.5.6-bin
     * [3]WinXP SP3
     * [4]VS2008 SP1
     * [5]WPF工程 With .NET Framework 3.5
     * 
     * 启动
     *  
     * 不带安全控制方式启动
     * [你的解压路径]apache-activemq-5.4.2inactivemq.bat
     * 
     * 安全方式启动
     * 添加环境变量:            ACTIVEMQ_ENCRYPTION_PASSWORD=activemq
     * [你的解压路径]apache-activemq-5.4.2in>activemq xbean:file:../conf/activemq-security.xml
     * 
     * Active MQ 管理地址
     * http://127.0.0.1:8161/admin/
     * 添加访问"http://127.0.0.1:8161/admin/"的限制
     * 
     * 第一步:添加访问限制
     * 修改D:apacheapache-activemq-5.4.2confjetty.xml文件
     * 下面这行编码,原
     * <property name="authenticate" value="true" />
     * 修改为
     * <property name="authenticate" value="false" />
     * 
     * 第二步:修改登录用户名密码,缺省分别为admin,admin
     * D:apacheapache-activemq-5.4.2confjetty-realm.properties
     * 
     * 用户管理(前提:以安全方式启动ActiveMQ)
     * 
     * 在[你的解压路径]apache-activemq-5.4.2confcredentials.properties文件中修改默认的用户名密码
     * 在[你的解压路径]apache-activemq-5.4.2confactivemq-security.xml文件中可以添加新的用户名
     * e.g.  添加oa用户,密码同用户名。
     * <authenticationUser username="oa" password="oa" groups="users,admins"/>
     * 
     * 在[你的解压路径]apache-activemq-5.4.2confactivemq-security.xml文件中你还可以设置指定的Topic或Queue
     * 只能被哪些用户组read 或 write。
     * 
     * 
     * 配置C# with WPF项目
     * 项目的[Application]->[TargetFramework]属性设置为[.NETFramework 3.5](这是VS2008WPF工程的默认设置)
     * 添加[你的解压路径]Apache.NMS.ActiveMQ-1.5.6-binlibApache.NMS
    et-3.5Apache.NMS.dll的引用
     * Apache.NMS.dll相当于接口
     * 
     * 如果是以Debug方式调试
     * 把[你的解压路径]Apache.NMS.ActiveMQ-1.5.6-binuild
    et-3.5debug目录下的
     * Apache.NMS.ActiveMQ.dll文件复制到你项目的Debug目录下
     * Apache.NMS.ActiveMQ.dll相当于实现
     * 
     * 如果是以Release方式调试
     * 参考上文,去取Apache.NMS,Release目录下相应的DLL文件,并复制到你项目的Release目录下。
     * 
     * 
     * 参考资料
     * [1]《C#调用ActiveMQ官方示例》 http://activemq.apache.org/nms/examples.html
     * [2]《ActiveMQ NMS下载地址》http://activemq.apache.org/nms/activemq-downloads.html
     * [3]《Active MQ在C#中的应用》http://www.cnblogs.com/guthing/archive/2010/06/17/1759333.html
     * [4]《NMS API Reference》http://activemq.apache.org/nms/nms-api.html
     */
    
    namespace testActiveMQSubscriber
    {
        /// <summary>
        /// Interaction logic for Window1.xaml
        /// </summary>
        public partial class Window1 : Window
        {
            private static IConnectionFactory connFac;
    
            private static IConnection connection;
            private static ISession session;
            private static IDestination destination;
            private static IMessageProducer producer;
            private static IMessageConsumer consumer;
    
    
            protected static ITextMessage message = null;
    
            public Window1()
            {
                InitializeComponent();
    
                initAMQ("MyFirstTopic");
            }
    
                     
            private void initAMQ(String strTopicName)
            {
                try
                {
                    connFac = new NMSConnectionFactory(new Uri("activemq:failover:(tcp://localhost:61616)"));
    
                    //新建连接
                    //connection = connFac.CreateConnection("oa","oa");//设置连接要用的用户名、密码
                    
                    //如果你要持久“订阅”,则需要设置ClientId,这样程序运行当中被停止,恢复运行时,能拿到没接收到的消息!
                    connection.ClientId = "testing listener";
                    connection = connFac.CreateConnection();//如果你是缺省方式启动Active MQ服务,则不需填用户名、密码
    
                    //创建Session
                    session = connection.CreateSession();
    
                    //发布/订阅模式,适合一对多的情况
                    destination = SessionUtil.GetDestination(session, "topic://" + strTopicName);
    
                    //新建生产者对象
                    producer = session.CreateProducer(destination);
                    producer.DeliveryMode = MsgDeliveryMode.NonPersistent;//ActiveMQ服务器停止工作后,消息不再保留
    
                    //新建消费者对象:普通“订阅”模式
                    //consumer = session.CreateConsumer(destination);//不需要持久“订阅”       
    
                    //新建消费者对象:持久"订阅"模式:
                    //    持久“订阅”后,如果你的程序被停止工作后,恢复运行,
                    //从第一次持久订阅开始,没收到的消息还可以继续收
                    consumer = session.CreateDurableConsumer(
                        session.GetTopic(strTopicName)
                        , connection.ClientId, null, false);
    
                    //设置消息接收事件
                    consumer.Listener += new MessageListener(OnMessage);
    
                    //启动来自Active MQ的消息侦听
                    connection.Start();
                }
                catch (Exception e)
                {
                    //初始化ActiveMQ连接失败,往VS2008的Output窗口写入出错信息!
                    Debug.WriteLine(e.Message);
                }
            }
    
            private void SendMsg2Topic_Click(object sender, RoutedEventArgs e)
            {
                //发送消息
                ITextMessage request = session.CreateTextMessage(DateTime.Now.ToLocalTime()+" "+tbMsg.Text);
                producer.Send(request);  
            }
    
            protected void OnMessage(IMessage receivedMsg)
            {
                //接收消息
                message = receivedMsg as ITextMessage;
    
                //UI线程,显示收到的消息
                Dispatcher.Invoke(DispatcherPriority.Normal, new Action(() =>
                {
                    DateTime dt = new DateTime();
                    ListBoxItem lbi = new ListBoxItem();
                    lbi.Content = DateTime.Now.ToLocalTime() + " " + message.Text;
    
                    lbR.Items.Add(lbi);
                }));
            }
        }
    }

    队列通讯方式,消费者例子

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using Apache.NMS;
    using System.Diagnostics;
    using log4net;
    using Apache.NMS.Util;
    using System.Collections;
    
    namespace Cat8637AutoCallServer
    {
        public class SMTask
        {
            public String Callee { get; set; }
            public String CheckNumber { get; set; }
            public int Deadline { get; set; }
    
            public override String ToString() 
            {
                return String.Format("Callee={0},CheckNumber={1},Deadline={2}",
                    Callee,CheckNumber,Deadline);
            }
        }
    
        /*
         * 负责接收任务,并把任务放在任务等待队列中。
         */
        public class MQClient
        {
            private static readonly ILog logger = LogManager.GetLogger(typeof(MQClient));
    
            private static IConnection connection = null;
            private static ISession session = null;
    
    
            Queue _voiceSMTasks = new Queue(); 
    
            public MQClient()
            {
                try
                {
                    IConnectionFactory factory = new NMSConnectionFactory(new Uri("activemq:failover:(tcp://localhost:61616)"));
    
                    //新建连接  
                    //connection = connFac.CreateConnection("oa","oa");//设置连接要用的用户名、密码  
                    connection = factory.CreateConnection();
    
                    session = connection.CreateSession();
                    IMessageConsumer consumer = session.CreateConsumer(session.GetQueue("TaskIssue_VoiceSM"));  
                    consumer.Listener += new MessageListener(OnMessage);
    
                    connection.Start();
                }
                catch (Exception ex)
                {
                    Debug.WriteLine(ex.Message);
                }
            }
    
            protected void OnMessage(IMessage receivedMsg)
            {
                IMessage message = receivedMsg as ITextMessage;
    
                SMTask smTask = new SMTask();
                smTask.Callee = message.Properties["Callee"] as String;
                smTask.CheckNumber = message.Properties["Message"] as String;
                smTask.Deadline = Convert.ToInt32(message.Properties["deadline"] as String);
    
                logger.Info("Received: "+smTask.ToString());
    
                lock (_voiceSMTasks)
                {
                    _voiceSMTasks.Enqueue(smTask);
                }
            }
    
            public SMTask GetVoiceSMTask()
            {
                SMTask result = null;
                lock (_voiceSMTasks)
                {
                    if (_voiceSMTasks.Count > 0)
                    {
                        result = _voiceSMTasks.Dequeue() as SMTask;
                    }
                }
                return result;
            }
        }
    }

    队列通讯方式,生产者例子

            private void Send_Click(object sender, RoutedEventArgs e)
            {
                try
                {
                    IDestination destination = SessionUtil.GetDestination(session, "queue://TaskIssue_VoiceSM");
    
                    //新建生产者对象  
                    IMessageProducer producer = session.CreateProducer(destination);
                    producer.DeliveryMode = MsgDeliveryMode.NonPersistent;//ActiveMQ服务器停止工作后,消息不再保留  
    
                    ITextMessage request = session.CreateTextMessage();
                    request.NMSCorrelationID = "TestVoiceSM";//这里我填了应用程序的名称。
    
                    request.Properties["Callee"] = tbCallee.Text;
                    request.Properties["Message"] = tbCheckNumber.Text;
                    request.Properties["deadline"] = tbValidDuration.Text;
    
                    producer.Send(request);
    
                }
                catch (Exception ex)
                {
                    //初始化ActiveMQ连接失败,往VS2008的Output窗口写入出错信息!  
                    Debug.WriteLine(ex.Message);
                }  
            }
    
            private void Window_Closed(object sender, EventArgs e)
            {
                try
                {
                    if (session == null)
                        return;
                    //if (connection == null)
                    //    return;
    
                    session.Close();
                    //connection.Close();
                }
                catch (Exception ex)
                {
                    Debug.WriteLine(ex.Message);
                }
            }
  • 相关阅读:
    今日进度
    今日进度
    今日进度
    今日进度
    pandas连接MySQL和impala
    sql语句获取今天、昨天、近7天、本周、上周、本月、上月、半年数据
    Python报错 ValueError: arrays must all be same length
    Python 连接 impala
    Test
    Selective Search for Object Recognition
  • 原文地址:https://www.cnblogs.com/wuling129/p/5150681.html
Copyright © 2011-2022 走看看