1、在Linux中安装ActiveMQ
官方文档地址:http://activemq.apache.org/getting-started.html#GettingStarted-StartingActiveMQStartingActiveMQ
(1)、(Ubuntu中)先更新,执行 sudo apt-get update
(2)、需要Java环境,使用java –version命令显示版本信息,没有JAVA自己安装配置去 - -
(3)、 下载activemq,执行: wget http:
//activemq.apache.org/path/tofile/apache-activemq-x.x.x-bin.tar.gz
(4)、解压,执行: cd [activemq_install_dir]
tar zxvf activemq-x.x.x-bin.tar.gz
(5)、运行 :cd [activemq_install_dir]/bin
./activemq start(我这步没有成功,然后看文件目录下有linux-x86-32/linux-x86-64,我是64位系统,然后进入linux-x86-64下,执行./activemq start,运行成功!)
(6)、输入HTTP://IP:8161 admin admin进入管理! 一定要输入HTTP:// 一定要输入HTTP:// 一定要输入HTTP:// 才能访问!!
2、使用延时消息特性
(该部分转自:http://www.voidcn.com/blog/doffs/article/p-2407882.html)
关于MSMQ和ActiveMQ的基础功能就不再描述。这里说说ActiveMQ的比较实用的一个功能:ScheduledMessage
ScheduledMessage可以理解为一个调度功能,可以延时、重复发送消息。我就是采用这种方案来实现Web中的定时功能和系统中不同模块之间的消息传递。
这里主要说一下延时消息的新增、删除操作(坑啊官方API里面貌似没有这一个功能的描述,只有一全案例,而案例上所用到的东西,在NuGet上下载的dll根本没有这一个类)。
要使用Scheduled功能需要在服务端conf/activemq.xml中把Scheduled打开,根据官网上的提示,是需要在broker上面添加一个schedulerSupport="true"属性,配置好后,重启服务。
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker schedulerSupport="true" xmlns="http://activemq.apache.org/schema/core"
延时消息发送:
IConnectionFactory factory = new ConnectionFactory("tcp://192.168.1.248:61616/");
using (IConnection connection = factory.CreateConnection())
{
using (ISession session = connection.CreateSession())
{
IMessageProducer prod = session.CreateProducer(
new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));
ITextMessage msg = prod.CreateTextMessage();
msg.Text = "这是一个延时消息";
msg.Properties.SetLong("AMQ_SCHEDULED_DELAY", 3600000); //这里设置一个延时发送,后面是延时单位是ms,
prod.Send(msg);
Console.WriteLine(msg.NMSMessageId);//发送成功后,NMSMessageId将会被赋值,如果需要删除单条消息需要把这个记录下来
}
}
发送成功后,在服务器上可以看到这个延时消息
删除这个延时消息:
IConnectionFactory factory = new ConnectionFactory("tcp://192.168.1.248:61616/");
using (IConnection connection = factory.CreateConnection())
{
using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
IDestination management = session.GetDestination("ActiveMQ.Scheduler.Management");
connection.Start();
IMessageProducer producer = session.CreateProducer(management);
IMessage request = session.CreateMessage();
request.Properties.SetString("AMQ_SCHEDULER_ACTION", "REMOVE");//指定当前消息是来删除某个延时消息
request.Properties.SetString("scheduledJobId", id);//指定要删除的延时消息的ID、这个ID就是刚刚发送成功时返回的ID
producer.Send(request);
}
}
参考资料
http://www.programcreek.com/java-api-examples/index.php?api=javax.jms.MessageListener
http://activemq.apache.org/maven/apidocs/src-html/org/apache/activemq/ScheduledMessage.html#line.64
发送延时消息的简单封装:
/// <summary> /// 发送文本延时消息 /// </summary> /// <param name="messageProducer">The message producer.</param> /// <param name="msg">消息内容</param> /// <param name="delayTime">延时时间(单位秒)</param> /// <returns><c>true</c> if XXXX, <c>false</c> otherwise.</returns> public bool SendScheduledTextMessage(IMessageProducer messageProducer, string msg, long delayTime) { try { var message = messageProducer.CreateTextMessage(); message.Properties.SetLong("AMQ_SCHEDULED_DELAY", delayTime * 1000); message.Text = msg; messageProducer.Send(message); return true; } catch (Exception) { return false; } }
3、Send参数说明
Send方法:
message.Send(msg, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
//第一个参数:消息的内容(byte或者是string)
//第二个参数:消息的持久化方式,这里为不进行持久。(如果为持久,可通过ActiveMQ的配置文件,配置ActiveMQ持久化的方式和位置)
//第三个参数:优先级(优先级为0-9,0最低,9最高)。这里使用的是Normal,为5
//第四个参数:为该条消息的过期时间,默认为0,永不过期。若设置过期时间,则消息超过设置的时间不处理,则会被丢弃到DLQ(Dead Letter Queue)队列。详细说明可参考:http://sharong.iteye.com/blog/1987171
4、获取ActiveMQ队列里的消息个数
这个一直在研究,但是迟迟搞不定。中文社区的资料里,基本都是JAVA的资料,使用JMS规范去获得,暂未发现.NET方面的资料。
所以暂附上现阶段的研究成果吧。
1、在官方的文档里,提到了如何获取这些信息。地址:http://activemq.apache.org/how-can-i-monitor-activemq.html。
根据文档发现首推的就是使用JMS,然后还有网页控制台,命令行,插件等等。不过依然没什么头绪。
2、在Stack Overflow上找到了一个相关问题,链接:http://stackoverflow.com/questions/7512004/activemq-with-c-sharp-and-apache-nms-count-messages-in-queue 。
(1)、根据问题的提示,是建立一个Browser,通过他一直去取消息,取到多少个代表队列里还有多少消息。暂不评论合不合适。 当我在使用的时候,发现每次都是只能取到一个。经过仔细研究之后,发现建立队列的时候,他的Message ID不同,用这种方法去取的时候,只能获取当前Message ID下的消息,所以每次都是一个。
(2)、全局所有关于Activemq的都是使用单例模式,然并卵。。。依旧无果
(3)、是不是配置的问题,只能访问到当前MessageID的消息?(这是一个问题)
3、最简单粗暴的方法,既然在8161控制台里能看到个数,那么直接HTTP请求,分析RSS页面内容。。你们懂的。。 http://127.0.0.1:8161/admin/queueBrowse/[队列名]?view=rss&feedType=atom_1.0