什么是ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。(摘自百度百科)
那么JMS有是什么呢,JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。(摘自百度百科)
下载和安装
apache主页:http://activemq.apache.org/ 下载版本为5.9.1,解压文件apache-activemq-5.9.1-bin.zip,双击apache-activemq-5.9.1/bin/activemq.bat 启动ActiveMQ。
访问:http://localhost:8161/admin/ 提供web管理页面。
编码使用Queue消息队列
1.主要接口
javax.jms.ConnectionFactory,JMS接口,ActiveMQ中实现类是ActiveMQConnectionFactory,用来创建connection;
javax.jms.Connection,JMS接口,ActiveMQ有具体的实现,作用类似JDBC中的connection;
javax.jms.Destination,JMS接口,ActiveMQ有具体的实现,该接口用来指定获取的队列Queue,activeMQ可以创建多个Queue;
javax.jms.Session,JMS接口,ActiveMQ有具体的实现,作用类似JDBC中的session
javax.jms.MessageProducer,JMS接口,是发送消息的主题
2.消息类型接口
javax.jms.TextMessage,JMS接口,文本类型消息;
javax.jms.MapMessage,JMS接口,Map类型消息,这两种基本是用的比较多的,有了Map类型的消息,基本很多问题都能解决
3.编码实现发送逻辑
package com.thomson.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class MessageSender { private String BROKEY_URL = "tcp://localhost:61616"; private String DESTINATION = "test.queue"; private ConnectionFactory connectionFactor; private Connection connection; private Session session; private Destination destination; private MessageProducer producer; private void initconfg() throws JMSException { this.connectionFactor = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKEY_URL); this.connection = connectionFactor .createConnection(); this.session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); this.destination = this.session.createQueue(this.DESTINATION); this.producer = this.session.createProducer(this.destination); } public void sendMessage() throws JMSException, CloneNotSupportedException { this.initconfg(); TextMessage message = this.session.createTextMessage(); message.setText("this is a message," + System.currentTimeMillis()); this.producer.send(message); this.clone(); } public void close() throws JMSException { this.session.commit(); this.producer.close(); this.session.close(); this.connection.close(); } }
4.编码实现接受逻辑
package com.thomson.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class MessageReceiver { private String BROKEY_URL = "tcp://localhost:61616"; private String DESTINATION = "test.queue"; private ConnectionFactory connectionFactor; private Connection connection; private Session session; private Destination destination; private MessageConsumer consumer; public void init() throws JMSException { this.connectionFactor = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKEY_URL); this.connection = this.connectionFactor.createConnection(); this.session = this.connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); this.destination = this.session.createQueue(this.DESTINATION); this.consumer = this.session.createConsumer(this.destination); } public void receiveMessage() throws JMSException { this.init(); Message message = this.consumer.receive(); if(message instanceof TextMessage) { TextMessage msg = (TextMessage)message; String messageText = msg.getText(); System.out.println("get Message[" + messageText + "]"); } } public void close() throws JMSException { this.consumer.close(); this.session.close(); this.connection.close(); } }
5.发送代码
6.web页面查看消息队列数据
7.接受消息