下载
到ActiveMQ官网,找到下载点。
目前,
官网为http://activemq.apache.org/。
我们下载目前最新的版本吧,当前的Linux版本下载地址之一为:http://apache.fayea.com/activemq/5.11.1/apache-activemq-5.11.1-bin.tar.gz。
启动
下载,并解压
wget http://apache.fayea.com/activemq/5.11.1/apache-activemq-5.11.1-bin.tar.gz tar -xf ./apache-activemq-5.11.1-bin.tar.gz
启动(当然,由于依赖于JAVA,如果你没有安装JAVA,它会提醒你的,哈哈)
[nicchagil@localhost bin]$ ./activemq start INFO: Loading '/home/nicchagil/app/apache-activemq-5.11.1/bin/env' INFO: Using java '/home/nicchagil/app/jdk1.7.0_71//bin/java' INFO: Starting - inspect logfiles specified in logging.properties and log4j.prop erties to get details INFO: pidfile created : '/home/nicchagil/app/apache-activemq-5.11.1/data/activem q.pid' (pid '4858')
测试启动成功与否
ActiveMQ默认监听61616端口,查此端口看看是否成功启动,如果一切顺利,会看到如下日志
[nicchagil@localhost bin]$ netstat -an | grep 61616 tcp 0 0 :::61616 :::* LIST EN
顺便,登录下管理员页面,看看有木有问题:
URL : http://192.168.1.101:8161/admin/
默认的用户名/密码 : admin/admin
Java客户端连接
接下来,用简单的点对点测试生产消息、消费消息。
引入所需包
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.11.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> </dependencies>
生产消息
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; public class Producer { private static final Logger LOG = Logger.getLogger(Producer.class); public static void main(String[] args) { // 获取连接工厂 ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://192.168.1.101:61616"); /* 获取连接 */ Connection connection = null; try { connection = factory.createConnection(); connection.start(); } catch (JMSException e) { LOG.error("获取连接出现异常", e); } /* 创建会话 */ Session session = null; try { session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { LOG.error("创建会话出现异常", e); } /* 创建消息生产者 */ Destination destination = null; try { destination = session.createQueue("TestQueue"); } catch (JMSException e) { LOG.error("创建队列出现异常", e); } /* 创建队列 */ MessageProducer producer = null; try { producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } catch (JMSException e) { LOG.error("创建消息生产者出现异常", e); } /* 发送消息 */ ObjectMessage message = null; try { message = session.createObjectMessage("hello world..."); producer.send(message); } catch (JMSException e) { LOG.error("发送消息出现异常", e); } try { session.commit(); } catch (JMSException e) { LOG.error("提交会话出现异常", e); } if (connection != null) { try { connection.close(); } catch (JMSException e) { LOG.error("关闭连接出现异常", e); } } LOG.info("sent..."); } }
消费消息
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; public class Consumer { private static final Logger LOG = Logger.getLogger(Consumer.class); // 是否继续响应,可按需由其他逻辑修改值,true:继续响应,false-停止响应 public static volatile boolean handleFlag = true; public static void main(String[] args) { // 获取连接工厂 ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://192.168.1.101:61616"); /* 获取连接 */ Connection connection = null; try { connection = factory.createConnection(); connection.start(); } catch (JMSException e) { LOG.error("获取连接出现异常", e); } /* 创建会话 */ Session session = null; try { session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { LOG.error("创建会话出现异常", e); } /* 创建消息生产者 */ Destination destination = null; try { destination = session.createQueue("TestQueue"); } catch (JMSException e) { LOG.error("创建队列出现异常", e); } /* 创建消费者 */ MessageConsumer consumer = null; try { consumer = session.createConsumer(destination); } catch (JMSException e) { LOG.error("创建消费者出现异常", e); } /* 获取消息对象 */ ObjectMessage objectMessage = null; while(handleFlag) { try { objectMessage = (ObjectMessage)consumer.receive(); handleMessage(objectMessage); } catch (JMSException e) { LOG.error("接收消息出现异常", e); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { LOG.error("关闭连接出现异常", e); } } } /** * 处理消息对应的业务 * @param objectMessage 消息对象 */ public static void handleMessage(final ObjectMessage objectMessage) { if (objectMessage == null) { return; } /* 处理业务 */ Object object = null; try { object = objectMessage.getObject(); } catch (JMSException e) { LOG.error("获取消息内容出现异常", e); } handleMessage(object); } /** * 处理消息对应的业务 * @param messageString 消息内容 */ public static void handleMessage(Object object) { if (object == null) { return; } String messageString = (String)object; LOG.info("Receive : " + messageString); // 这里仅作打印业务而已 } }
看到控制台打印出:Receive : hello world...,可知接收到消息了。
集群的安装(Replicated LevelDB Store)
ActiveMQ的集群有3种类型,介绍在此Introduction to Master / Slave,我们下面使用的是Replicated LevelDB Store。
编辑配置文件/home/activemq/conf/activemq.xml,编辑以下块:
<persistenceAdapter> <replicatedLevelDB directory="activemq-data" replicas="3" bind="tcp://0.0.0.0:0" zkAddress="xx.xx.xx.xx:xxxx,xx.xx.xx.xx:xxxx,xx.xx.xx.xx:xxxx(zookeeper集群)" zkPassword="zk password" zkPath="/activemq/leveldb-stores" hostname="当前机器IP" /> </persistenceAdapter>
其它节点也如此配置。
启动,查看日志是否正常启动。
荆棘
JDK版本的要求
过程中,遇到一个小问题,就是我一开始是用JDK1.6去跑的,报出常见的Unsupported major.minor version 51.0
针对这个问题,这个帖子有很好的参考意义:
http://www.cnblogs.com/chinafine/articles/1935748.html
找出jar中的一个class,执行以下命令,可查出minor version、major version:
javap -verbose yourClassName
或直接查看jar中的META-INFMANIFEST.MF。
然后对照帖子中的JDK版本,换成JDK1.7就OK了。