1.下载activeMQ
2.解压activeMQ
3.启动
对于5.10版本以及之后用:binactivemq start
对于5.9版本以及更早的用:binactivemq
4.消息类型
4.1点对点
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费、其它的则不能消费此消息了。当消费者不存在时,消息会一直保存,直到有消费消费
4.2发布/订阅
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。当生产者发布消息,不管是否有消费者。都不会保存消息
5.示例代码
5.1点对点
1)生产者
package org.tonny.junior.queue; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { public static void main(String[] args) { String user = ActiveMQConnection.DEFAULT_USER; String password = ActiveMQConnection.DEFAULT_PASSWORD; String url = ActiveMQConnection.DEFAULT_BROKER_URL; String subject = "TOOL.DEFAULT"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); try { Connection conn = connectionFactory.createConnection(); conn.start(); Session session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(subject); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 1000; i++) { MapMessage message = session.createMapMessage(); Date date = new Date(); message.setInt("count", i + 1); message.setLong("time", date.getTime()); Thread.sleep(2000); producer.send(message); System.out.println("--发送消息:" + date); session.commit(); } session.close(); conn.close(); } catch (Exception e) { // TODO: handle exception } } }
2)消费者
package org.tonny.junior.queue; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer { public static void main(String[] args) { String user = ActiveMQConnection.DEFAULT_USER; String password = ActiveMQConnection.DEFAULT_PASSWORD; String url = ActiveMQConnection.DEFAULT_BROKER_URL; String subject = "TOOL.DEFAULT"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); try { Connection conn = connectionFactory.createConnection(); conn.start(); Session session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(subject); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { MapMessage msg = (MapMessage) message; try { System.out.println("--收到消息" + msg.getInt("count")); System.out.println("--收到消息" + new Date(msg.getLong("time"))); } catch (Exception e) { e.printStackTrace(); } } }); Thread.sleep(3000); session.commit(); session.close(); conn.close(); } catch (Exception e) { // TODO: handle exception } } }
5.2发布/订阅
1)订阅者
package org.tonny.junior.topic; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class SubscriberFirst { public static void main(String[] args) { String user = ActiveMQConnection.DEFAULT_USER; String password = ActiveMQConnection.DEFAULT_PASSWORD; String url = ActiveMQConnection.DEFAULT_BROKER_URL; String subject = "TOOL.DEFAULT"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); try { Connection conn = connectionFactory.createConnection(); conn.start(); Session session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(subject); MessageConsumer comsumer = session.createConsumer(topic); comsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message msg) { MapMessage message = (MapMessage) msg; try { System.out.println("--收到消息" + message.getInt("count")); System.out.println("--收到消息" + new Date(message.getLong("time"))); session.commit(); } catch (Exception e) { // TODO: handle exception } } }); //session.close(); //conn.close(); } catch (Exception e) { // TODO: handle exception } } }
2)发布者
package org.tonny.junior.topic; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Publisher { public static void main(String[] args) { String user = ActiveMQConnection.DEFAULT_USER; String password = ActiveMQConnection.DEFAULT_PASSWORD; String url = ActiveMQConnection.DEFAULT_BROKER_URL; String subject = "TOOL.DEFAULT"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); try { Connection conn = connectionFactory.createConnection(); conn.start(); Session session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(subject); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i = 0; i < 20; i++) { MapMessage message = session.createMapMessage(); Date date = new Date(); message.setInt("count", i + 1); message.setLong("time", date.getTime()); Thread.sleep(200); producer.send(message); System.out.println("--发送消息:" + date); } session.commit(); session.close(); conn.close(); } catch (Exception e) { // TODO: handle exception } } }
6.消息查看
登录 http://localhost:8161/admin/index.jsp 用户名/密码: admin/admin