ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>${active-mq-version}</version> </dependency> <!-- https://mvnrepository.com/artifact/junit/junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies>
下面我们就开始消息的收发,先来探讨一下点对点的消息收发:
1、消息的发送
原理上我们通过ConnectionFactory会话工厂 ---> 创建一个会话连接 ---> 通过会话连接创建一个会话线程 ---> 通过会话线程创建一个消息队列 ---> 通过会话线程和消息队列创建一个消息生产者(MessageProducer) ---> 最后由消息生产者进行消息的发送。
private static ConnectionFactory factory;//会话连接工厂 private static Connection connection;//会话连接 private static Session session;//会话接收或发送消息线程 private static Destination destination;//消息队列 private static MessageProducer messageProducer;//消息发送者
通过ActiveMQConnectionFactory创建会话工厂
factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);//创建会话连接工厂
设置会话连接额度地址和用户名密码在(这里我使用的是默认的地址和用户名密码)
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
下面封装了一下上面的过程:
/** * @Description 发送消息 * @param queryName 消息队列名称 * @param msg 消息内容 * @return * * @author 高尚 * @version 1.0 * @date 创建时间:2018年1月31日 下午3:54:22 */ public static boolean producerSendQueryMessage(final String queryName, final String msg){ boolean flag = true; try { connection = factory.createConnection();//创建会话连接 connection.start();//启动会话连接 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建会话线程 destination = session.createQueue(queryName);//创建消息队列 messageProducer = session.createProducer(destination);//创建会话生成者 Message message = session.createTextMessage(msg);//创建消息对象 messageProducer.send(message);//发送消息 session.commit(); } catch (JMSException e) { e.printStackTrace(); flag = false; }finally { if(null != connection){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } return flag; }
这里单独说一下connection.createSession生成会话线程:
Session session = connection.createSession(paramA,paramB); paramA是设置事务,paramB是设置acknowledgment mode paramA 取值有: 1、true:支持事务 为true时:paramB的值忽略, acknowledgment mode被jms服务器设置为SESSION_TRANSACTED 。 2、false:不支持事务 为false时:paramB的值可为Session.AUTO_ACKNOWLEDGE、Session.CLIENT_ACKNOWLEDGE、DUPS_OK_ACKNOWLEDGE其中一个。 paramB 取值有: 1、Session.AUTO_ACKNOWLEDGE:为自动确认,客户端发送和接收消息不需要做额外的工作。 2、Session.CLIENT_ACKNOWLEDGE:为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。 3、DUPS_OK_ACKNOWLEDGE:允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。 4、SESSION_TRANSACTED
到这里我们的消息发送模块就封装完成,下面我们看一下消息的接收:
原理上我们通过ConnectionFactory会话工厂 ---> 创建一个会话连接 ---> 通过会话连接创建一个会话线程 ---> 通过会话线程创建一个消息队列 ---> 通过会话线程和消息队列创建一个消息消费者(MessageConsumer) ---> 最后由消息消费者进行消息获取
/** * @Description 接收消息队列中的消息 * @param queryName 消息队列名称 * @return * * @author 高尚 * @version 1.0 * @date 创建时间:2018年1月31日 下午4:24:14 */ public static void consumerGetQueryMessage(final String queryName){ try { connection = factory.createConnection();//创建会话连接 connection.start();//启动会话连接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(queryName); messageConsumer = session.createConsumer(destination); while(true){ TextMessage message = (TextMessage) messageConsumer.receive(); if(null != message){ System.out.println(queryName+"发送消息:"+message.getText()); } } } catch (Exception e) { // TODO: handle exception System.out.println("消费消息异常"); }finally { if(null != connection){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
最后单独说一下messageConsumer.receive方法:
messageConsumer.receive();//一直等到消息 messageConsumer.receive(1000);//等到消息1秒钟 messageConsumer.receiveNoWait();//不等待消息
当然这样的接收消息方式是不是感觉很low,当然我们还可以通过监听器来实现消息接收的监听(MessageListener),我们实现MessageListener接口,自定义消息接收监听器:
/** * @Description 消息监听器 * @author 高尚 * @version 1.0 * @date 创建时间:2018年1月31日 下午4:52:36 */ public class MsgListener implements MessageListener { /** * 监听的会话队列 */ private static String queryName; @Override public void onMessage(Message msg) { TextMessage textMsg = (TextMessage) msg; try { if(null != textMsg){ System.out.println("【" + queryName + "】发送的消息:" + textMsg.getText()); } } catch (JMSException e) { e.printStackTrace(); System.out.println("获取会话消息异常"); } } public MsgListener(String queryName) { super(); // TODO Auto-generated constructor stub this.queryName = queryName; } }
然后我们需要简单修改一个消息接收策略:
/** * @Description 通过Listener接收消息队列中的消息 * @param queryName 消息队列名称 * @return * * @author 高尚 * @version 1.0 * @date 创建时间:2018年1月31日 下午4:24:14 */ public static void consumerGetQueryMessageListener(String queryName) { try { connection = factory.createConnection();//创建会话连接 connection.start();//启动会话连接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(queryName); messageConsumer = session.createConsumer(destination); MsgListener msgListener = new MsgListener(queryName); messageConsumer.setMessageListener(msgListener); while(true){ Thread.sleep(10000); } } catch (Exception e) { // TODO: handle exception System.out.println("消费消息异常"); }finally { if(null != connection){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
最后我们一起看一下消息的发布和订阅,首先消息的发布和订阅与点点的消息收发基本一致,不同点在于会话线程的创建:
destination = session.createTopic(queryName);//创建消息发布线程
其他部分一致,没有什么难度,这里不再阐述,大家可以自行测试。当然我也为大家提供了测试参考代码:https://github.com/hpugs/ActiveMQ
到这里关于ActiveMQ入门部分就和大家探讨完毕,如有错误还有指教。谢谢