本文主要介绍activeMQ在应用程序中是如何使用的,同个两个实例进行说明,这两个实例分别针对P2P模式和Pub/Sub模式。
开发环境
- 操作系统:Ubuntu 16.10
- 开发平台:Eclipse Neon Release (4.6.0)
- ActiveMQ版本:apache-activemq-5.14.3
具体的环境下载与配置这里就不在详细描述啦
项目建立与实现
先为大家展示以下项目最后的结构图:
操作步骤
- 在Eclipse中新建一个最基本的Java Project,本项目命名为“activeMQHelloWorld”
- 在项目根目录下建立文件夹libs,并将activemq-all-5.14.3.jar依赖包复制到文件夹中
- 通过 JavaBuildPath 中的libraries将依赖包引入项目中
到目前位置项目框架搭建完毕(简单容易吧)
分别实现P2P消息模型和Pub/Sub消息模型,首先实现P2P消息模型:
P2P消息模型
编写生产者代码QueueProducer.java如下:
1 package com.unionpay.activemq; 2 3 import javax.jms.Connection; 4 import javax.jms.ConnectionFactory; 5 import javax.jms.Destination; 6 import javax.jms.MessageProducer; 7 import javax.jms.Session; 8 import javax.jms.TextMessage; 9 10 import org.apache.activemq.ActiveMQConnection; 11 import org.apache.activemq.ActiveMQConnectionFactory; 12 13 public class QueueProducer { 14 15 //默认连接用户 16 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; 17 //默认连接密码 18 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; 19 //默认连接地址 20 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; 21 22 private static final int SENDNUM = 10; 23 24 public static void main(String[] args){ 25 //连接工厂 26 ConnectionFactory connectionFactory; 27 //连接 28 Connection connection = null; 29 //会话,接受或者发送消息的线程 30 Session session; 31 //消息的目的地 32 Destination destination; 33 //消息生产者 34 MessageProducer messageProducer; 35 36 //实例化连接工厂 37 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL); 38 39 try{ 40 //通过连接工厂获取连接 41 connection = connectionFactory.createConnection(); 42 //启动连接 43 connection.start(); 44 //创建session,第一个参数true表示支持事物,false表示不支持事物,Session.AUTO_ACKKNOWLEDGE 45 //表示自动确认,客户端发送和接受消息不需要做额外的工作 46 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 47 //创建一个名为HelloWorld的消息队列 48 destination = session.createQueue("QueueTest"); 49 //创建消息生产者 50 messageProducer = session.createProducer(destination); 51 //发送消息 52 sendMessage(session,messageProducer); 53 //提交消息 54 session.commit(); 55 session.close(); 56 }catch(Exception e){ 57 e.printStackTrace(); 58 }finally{ 59 if(connection != null){ 60 try{ 61 connection.close(); 62 }catch(Exception e){ 63 e.printStackTrace(); 64 } 65 } 66 } 67 } 68 69 /** 70 * 发送消息 71 * @param session 72 * @param messageProducer 73 * @throws Exception 74 */ 75 public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{ 76 for(int i=0;i<SENDNUM;i++){ 77 //创建一条文笔消息 78 TextMessage message = session.createTextMessage("ActiveMQ 发送消息"+ i); 79 System.out.println("发送消息:Activemq发送消息" + i); 80 81 messageProducer.send(message); 82 } 83 } 84 }
编写消费者代码QueueConsumer.java代码如下:
1 package com.unionpay.activemq; 2 3 import javax.jms.Connection; 4 import javax.jms.ConnectionFactory; 5 import javax.jms.Destination; 6 import javax.jms.JMSException; 7 import javax.jms.MessageConsumer; 8 import javax.jms.Session; 9 import javax.jms.TextMessage; 10 11 import org.apache.activemq.ActiveMQConnection; 12 import org.apache.activemq.ActiveMQConnectionFactory; 13 14 /** 15 * @author jxwch 16 * 17 */ 18 public class QueueConsummer { 19 20 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; 21 22 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; 23 24 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; 25 26 public static void main(String[] args) { 27 28 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL); 29 try { 30 Connection connection = connectionFactory.createConnection(); 31 32 connection.start(); 33 34 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 35 36 Destination destination = session.createQueue("QueueTest"); 37 38 MessageConsumer messageConsumer = session.createConsumer(destination); 39 40 while (true) { 41 //100000代表100000毫秒 42 TextMessage message = (TextMessage) messageConsumer.receive(100000); 43 if (message != null) { 44 System.out.println("收到消息:" + message.getText()); 45 } else { 46 break; 47 } 48 } 49 } catch (JMSException e) { 50 e.printStackTrace(); 51 } 52 } 53 54 }
到此,P2P模型代码已经全部编写完成,可以测试喽:
当然,我们要测试activeMQ,那么首先一定要启动服务器:
1 cd apache-activemq-5.14.3/bin 2 bash activemq start
通过访问自带监控应用查看服务器是否启动正常:http://127.0.0.1:8161/admin/
若服务器运行正常,首先在Eclipse中运行QueueProducer.java,终端打印出如下信息:
此时查看监控程序页面,点击“Queue”出现如下信息:
从截图中我们可以看到,在Queue消息中,有一个Name为QueueTest的消息队列,其中“Number Of Pending Message”表示队列中存在10条消息,“Message Enqueued” 表示有10条消息正在排队。通过点击Views中的Browser可以查看队列中的消息:
并且可以通过Delete对这些消息进行删除操作。
下面我们继续运行QueueConsumer.java,终端打印如下信息:
Pub/Sub 模型
首先编写Publisher端文件TopicProducer.java:
1 package com.unionpay.activemq; 2 3 import javax.jms.Connection; 4 import javax.jms.ConnectionFactory; 5 import javax.jms.DeliveryMode; 6 import javax.jms.Destination; 7 import javax.jms.JMSException; 8 import javax.jms.MapMessage; 9 import javax.jms.MessageProducer; 10 import javax.jms.Session; 11 import javax.jms.Topic; 12 13 import org.apache.activemq.ActiveMQConnection; 14 import org.apache.activemq.ActiveMQConnectionFactory; 15 16 public class TopicProducer { 17 18 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; 19 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; 20 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; 21 private static final int SENDNUM = 10; 22 23 public static void main(String[] args) { 24 25 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL); 26 Connection connection = null; 27 Session session = null; 28 Topic topic = null; 29 MessageProducer messageProducer = null; 30 try { 31 connection = connectionFactory.createConnection(); 32 33 connection.start(); 34 35 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 36 37 topic = session.createTopic("NEWS"); 38 39 messageProducer = session.createProducer(topic); 40 41 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); 42 43 for (int i = 0; i < SENDNUM; i++) { 44 MapMessage mapMessage = session.createMapMessage(); 45 46 mapMessage.setLong("count", i); 47 48 messageProducer.send(mapMessage); 49 50 System.out.println("发布者发布消息:" + i); 51 52 session.commit(); 53 } 54 } catch (JMSException e) { 55 // TODO Auto-generated catch block 56 e.printStackTrace(); 57 } finally { 58 if (session != null) { 59 try { 60 session.close(); 61 } catch (JMSException e) { 62 e.printStackTrace(); 63 } 64 } 65 66 if (connection != null) { 67 try { 68 connection.close(); 69 } catch (JMSException e) { 70 e.printStackTrace(); 71 } 72 } 73 } 74 } 75 }
然后编辑Subscriber端文件TopicConsumer.java:
package com.unionpay.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class TopicConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args){ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL); Connection connection = null; try { connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("NEWS"); MessageConsumer messageConsumer = session.createConsumer(topic); while(true){ MessageListener messageListener = new MessageListener(){ @Override public void onMessage(Message message) { // TODO Auto-generated method stub MapMessage mapMessage = null; try{ mapMessage = (MapMessage)message; System.out.println("Receiver Message:" + mapMessage.getLong("count")); }catch(JMSException e){ e.printStackTrace(); } } }; messageConsumer.setMessageListener(messageListener); } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ if(connection != null){ try{ connection.close(); }catch(JMSException e){ e.printStackTrace(); } } } } }
到此,Pub/Sub模型代码编写完毕,下面运行TopicProducer.java文件,终端打印出如下信息:
然后查看监控程序,点击“Topics”:
从截图中我们可以看出Name中多了一个NEWS主题,并且Messages Enqueued为10。
然后运行客户端TopicConsumer.java文件,终端显示如下:
从截图中我们并没有发现客户端消费了消息,这是为啥呢?
因为在Pub/Sub模型中,发布者和订阅者有时间上的依赖性,针对某个主题,必须先创建订阅者,然后才能发布消息,这样才能保证订阅者可以收到消息。
重新运行TopicConsumer.java文件,就可以看见消费信息了:
至此,两种模式已经全部介绍完毕。
参考文献