zoukankan      html  css  js  c++  java
  • ActiveMQ使用记录

    1、在Linux中安装ActiveMQ

    官方文档地址:http://activemq.apache.org/getting-started.html#GettingStarted-StartingActiveMQStartingActiveMQ

    (1)、(Ubuntu中)先更新,执行 sudo apt-get update

    (2)、需要Java环境,使用java –version命令显示版本信息,没有JAVA自己安装配置去 - -

    (3)、 下载activemq,执行: wget http://activemq.apache.org/path/tofile/apache-activemq-x.x.x-bin.tar.gz

    (4)、解压,执行: cd [activemq_install_dir]      tar zxvf activemq-x.x.x-bin.tar.gz

    (5)、运行 :cd [activemq_install_dir]/bin   ./activemq start(我这步没有成功,然后看文件目录下有linux-x86-32/linux-x86-64,我是64位系统,然后进入linux-x86-64下,执行./activemq start,运行成功!

    (6)、输入HTTP://IP:8161  admin  admin进入管理!  一定要输入HTTP://   一定要输入HTTP://    一定要输入HTTP://  才能访问!!

     

    2、使用延时消息特性 

    (该部分转自:http://www.voidcn.com/blog/doffs/article/p-2407882.html) 

    关于MSMQ和ActiveMQ的基础功能就不再描述。这里说说ActiveMQ的比较实用的一个功能:ScheduledMessage

    ScheduledMessage可以理解为一个调度功能,可以延时、重复发送消息。我就是采用这种方案来实现Web中的定时功能和系统中不同模块之间的消息传递。

    这里主要说一下延时消息的新增、删除操作(坑啊官方API里面貌似没有这一个功能的描述,只有一全案例,而案例上所用到的东西,在NuGet上下载的dll根本没有这一个类)。

    要使用Scheduled功能需要在服务端conf/activemq.xml中把Scheduled打开,根据官网上的提示,是需要在broker上面添加一个schedulerSupport="true"属性,配置好后,重启服务。

     <!--
            The <broker> element is used to configure the ActiveMQ broker.
        -->
        <broker schedulerSupport="true" xmlns="http://activemq.apache.org/schema/core" 

    延时消息发送:

                    IConnectionFactory factory = new ConnectionFactory("tcp://192.168.1.248:61616/");
                    using (IConnection connection = factory.CreateConnection())
                    {
                        using (ISession session = connection.CreateSession())
                        {
                            IMessageProducer prod = session.CreateProducer(
                                new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));
    
                            ITextMessage msg = prod.CreateTextMessage();
                            msg.Text = "这是一个延时消息";
                            msg.Properties.SetLong("AMQ_SCHEDULED_DELAY", 3600000); //这里设置一个延时发送,后面是延时单位是ms,
                            prod.Send(msg);
                            Console.WriteLine(msg.NMSMessageId);//发送成功后,NMSMessageId将会被赋值,如果需要删除单条消息需要把这个记录下来
    
                        }
                    }

    发送成功后,在服务器上可以看到这个延时消息

    删除这个延时消息:

     IConnectionFactory factory = new ConnectionFactory("tcp://192.168.1.248:61616/");
                using (IConnection connection = factory.CreateConnection())
                {
                    using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
                    {
                        IDestination management = session.GetDestination("ActiveMQ.Scheduler.Management");
                        connection.Start();
                        IMessageProducer producer = session.CreateProducer(management);
                        IMessage request = session.CreateMessage();
    
                        request.Properties.SetString("AMQ_SCHEDULER_ACTION", "REMOVE");//指定当前消息是来删除某个延时消息
                        request.Properties.SetString("scheduledJobId", id);//指定要删除的延时消息的ID、这个ID就是刚刚发送成功时返回的ID
                        producer.Send(request);
    
                    }
                }

    参考资料

    http://www.programcreek.com/java-api-examples/index.php?api=javax.jms.MessageListener

    http://activemq.apache.org/maven/apidocs/src-html/org/apache/activemq/ScheduledMessage.html#line.64

    发送延时消息的简单封装:

            /// <summary>
            /// 发送文本延时消息
            /// </summary>
            /// <param name="messageProducer">The message producer.</param>
            /// <param name="msg">消息内容</param>
            /// <param name="delayTime">延时时间(单位秒)</param>
            /// <returns><c>true</c> if XXXX, <c>false</c> otherwise.</returns>
            public bool SendScheduledTextMessage(IMessageProducer messageProducer, string msg, long delayTime)
            {
                try
                {
                    var message = messageProducer.CreateTextMessage();
                    message.Properties.SetLong("AMQ_SCHEDULED_DELAY", delayTime * 1000);
                    message.Text = msg;
                    messageProducer.Send(message);
                    return true;
                }
                catch (Exception)
                {
                    return false;
                }
            }
    

    3、Send参数说明

    Send方法: 

    message.Send(msg, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
    //第一个参数:消息的内容(byte或者是string)
    //第二个参数:消息的持久化方式,这里为不进行持久。(如果为持久,可通过ActiveMQ的配置文件,配置ActiveMQ持久化的方式和位置)
    //第三个参数:优先级(优先级为0-9,0最低,9最高)。这里使用的是Normal,为5
    //第四个参数:为该条消息的过期时间,默认为0,永不过期。若设置过期时间,则消息超过设置的时间不处理,则会被丢弃到DLQ(Dead Letter Queue)队列。详细说明可参考:http://sharong.iteye.com/blog/1987171

     4、获取ActiveMQ队列里的消息个数

    这个一直在研究,但是迟迟搞不定。中文社区的资料里,基本都是JAVA的资料,使用JMS规范去获得,暂未发现.NET方面的资料。

    所以暂附上现阶段的研究成果吧。

    1、在官方的文档里,提到了如何获取这些信息。地址:http://activemq.apache.org/how-can-i-monitor-activemq.html

      根据文档发现首推的就是使用JMS,然后还有网页控制台,命令行,插件等等。不过依然没什么头绪。

    2、在Stack Overflow上找到了一个相关问题,链接:http://stackoverflow.com/questions/7512004/activemq-with-c-sharp-and-apache-nms-count-messages-in-queue  。

      (1)、根据问题的提示,是建立一个Browser,通过他一直去取消息,取到多少个代表队列里还有多少消息。暂不评论合不合适。  当我在使用的时候,发现每次都是只能取到一个。经过仔细研究之后,发现建立队列的时候,他的Message ID不同,用这种方法去取的时候,只能获取当前Message ID下的消息,所以每次都是一个。

      (2)、全局所有关于Activemq的都是使用单例模式,然并卵。。。依旧无果

      (3)、是不是配置的问题,只能访问到当前MessageID的消息?(这是一个问题)

    3、最简单粗暴的方法,既然在8161控制台里能看到个数,那么直接HTTP请求,分析RSS页面内容。。你们懂的。。   http://127.0.0.1:8161/admin/queueBrowse/[队列名]?view=rss&feedType=atom_1.0

  • 相关阅读:
    JS知识点简单总结
    Js答辩总结
    JS答辩习题
    轮播
    jQuery选择器总结
    JS的魅力
    JS与JAVA数据类型的区别
    单表查询、多表查询、虚拟表连接查询
    Mysql基本语句
    Mysql数据库
  • 原文地址:https://www.cnblogs.com/SzeCheng/p/5301013.html
Copyright © 2011-2022 走看看