一般步骤:
- 请求一个JMS连接工i厂。
- 是用连接工厂创建连接。
- 启动JMS连接。
- 通过连接创建session。
- 获取一个目标。
- 创建一个生产者,或a.创建一个生产者,b.创建一条JMS消息并发送到目标
- 创建一个消费者,或a.创建一个消费者,b.注册一个消息监听器。
- 发送或接受消息。
- 关闭所有资源(连接,会话,生产者,消费者等)。
首先登陆至ActiveMQ后台创建一个队列为TestQueue:
..省略
创建生产者:
package com.thunisoft.jms.mine; import java.util.HashMap; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * JMS生产者 * * @author zhangxsh * */ public class Producer { /** * @param args */ public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session; Destination destination; MessageProducer producer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { // 通过连接工厂创建连接 connection = connectionFactory.createConnection(); // 启动连接 connection.start(); // 通过连接打开一个会话 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 根据特定的队列名称创建一个目标地 destination = session.createQueue("TestQueue"); // 根据目标地创建一个生产者 producer = session.createProducer(destination); // 不需要持久化的投递模式 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i = 1; i <= 10; i++) { ObjectMessage message = session.createObjectMessage(); HashMap m = new HashMap(); m.put("key" + i, i); message.setObject(m); // 发送消息到目的地方 System.out.println("发送消息:" + "ActiveMq 发送的消息" + i); producer.send(message); } session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }
消费者:
package com.thunisoft.jms.mine; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * JMS消费者 * * @author zhangxsh * */ public class Consumer { /** * @param args */ public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session; Destination destination; MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("TestQueue"); consumer = session.createConsumer(destination); while (true) { ObjectMessage message = (ObjectMessage) consumer .receive(100000); if (null != message) { System.out.println("收到消息" + message.getObject()); } else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }