ps: ActiveMQ 5.10 以上版本需要使用 JDK1.8
1.启动activemq
1.1 进入activemq解压后的bin目录,执行命令 ./activemq start
1.2 查看activemq进程 ps -aux | grep activemq
1.3 访问管理界面, 地址 http:// ip地址: 8161/admin, 默认用户名和密码都是 admin
ps : 8161 是默认端口,如果要修改默认端口,需要编辑 /conf/jetty.xml 文件 (activemq内置jetty服务器)
2. 创建项目,导入jar包
3. 处理文本消息
3.1 创建 TextMessageProducer.java (文本消息提供者)
package cn.demo.producer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class TextMessageProducer { /** * 将文本写入消息队列 * @param text * @param destName */ public void sendMsgToActiveMQ(String text, String destName) { // 链接工厂 ConnectionFactory factory = null; // 链接 Connection connection = null; // 会话 Session session = null; // 目的队列 Destination dest = null; // 消息生产者 MessageProducer producer = null; // 定义消息 Message message = null; try { /** * 1. 创建连接工厂 * 构造方法: public ActiveMQConnectionFactory(String userName,String password) * userName - 访问ActiveMQ服务的用户名, 用户名可以通过jetty-realm.properties配置文件配置. * password - 访问ActiveMQ服务的密码, 密码可以通过jetty-realm.properties配置文件配置. * brokerURL - 访问ActiveMQ服务的路径地址. 路径结构为 - 协议名://主机地址:端口号 * 此链接基于TCP/IP. */ factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.230.4:61616"); // 2. 创建连接 connection = factory.createConnection(); // 开启连接 connection.start(); /** * 创建会话connection.createSession(false, Session.AUTO_ACKNOWLEDGE) * 第一个参数: 事务的提交的方式 * true : 表示手动提交事务,则第二个参数失效 * false : 不提交事务,则必须指定消息确认机制 * * 第二个参数: 消息确认机制 acknowledgeMode -消息确认机制,可选值为: * Session.AUTO_ACKNOLEDGE - 自动确认消息机制 * Session.CLIENT_ACKNOLEDGE - 客户端确认消息机制 * Session.DUPS_OK_ACKNOLEDGE - 有副本的客户端确认消息机制 */ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4. 指定目的地,即队列命名,消息消费者 需要通过此命名访问对应的队列 dest = session.createQueue(destName); // 5. 创建消息生产者,参数为目的地队列名 producer = session.createProducer(dest); // 6. 创建一个文本消息,此消息对象中保存要传递的文本信息 message = session.createTextMessage(text); // 7. 发送 producer.send(message); System.out.println("消息发送成功"); } catch (Exception e) { e.printStackTrace(); System.out.println("消息发送失败"); } finally { // 关闭资源 if(null != producer) { try { producer.close(); } catch (Exception e) { e.printStackTrace(); } } if(null != session) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if(null != connection) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
3.2 创建测试类,测试消息是否发送成功
public class TestMQ { @Test public void testTextProducer(){ TextMessageProducer producer = new TextMessageProducer(); producer.sendMsgToActiveMQ("嘻嘻嘻嘻嘻","text-mq"); System.out.println("消息发送成功"); } }
3.3 然后就会看见一条消息待处理(等待出队列的消息数量)
3.4 创建通用消息消费者 ConsumerMQ.java
public class ConsumerMQ { public Message recieveMessageFromMQ(String destName) { ConnectionFactory factory = null; Connection connection = null; Session session = null; Destination dest = null; MessageConsumer consumer = null; Message message = null; try { factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.230.4:61616"); // 2. 创建连接 connection = factory.createConnection(); // 开启连接 connection.start(); // 3. 创建会话 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4.指定队列地址 dest = session.createQueue(destName); // 5.创建消费者 consumer = session.createConsumer(dest); // 6.读取消息 message = consumer.receive(); } catch (Exception e) { e.printStackTrace(); } finally { //关闭资源 if(null!=consumer){ try { consumer.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(null!=session){ try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(null!=connection){ try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } return message; } }
3.4 消息消费者测试方法
@Test public void testTextConsumer(){ try { ConsumerMQ c = new ConsumerMQ(); Message message = c.recieveMessageFromMq("text-mq"); TextMessage msg = (TextMessage) message; System.out.println("从队列中获取的消息是:"+msg.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
结果:
Number Of Pending Messages(等待出队列的消息数)-1,
Messages Enqueued(进队列数) +1
Messages Dequeued (出队列数) +1
4. 处理对象消息
4.1 创建pojo,该对象必须实现序列化
public class Item implements Serializable { private Long id; private String title; private Long price; public Item() { super(); // TODO Auto-generated constructor stub } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public Long getPrice() { return price; } public void setPrice(Long price) { this.price = price; } }
4.2 创建ObjectMessageProducer.java(对象消息提供者)
public class ObjectMessageProducer { public void sendObjectToMQ(Item item, String destName) { ConnectionFactory factory = null; Connection connection = null; Session session = null; Destination dest = null; MessageProducer producer = null; Message message = null; try { factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.230.4:61616"); // 2.开启连接 connection = factory.createConnection(); // 3.创建会话 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4.指定队列 dest = session.createQueue(destName); // 5.创建发送者 producer = session.createProducer(dest); // 6.创建对象消息,对象必须实现序列化 message = session.createObjectMessage(item); // 7.发送 producer.send(message); } catch (Exception e) { e.printStackTrace(); } finally { //关闭资源 if(null!=producer){ try { producer.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(null!=session){ try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(null!=connection){ try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
4.3 测试消息发送
@Test public void testObjectMessageProducer(){ ObjectMessageProducer producer = new ObjectMessageProducer(); Item item = new Item(); item.setId(1L); item.setPrice(10000000L); item.setTitle("这是个对象消息"); producer.sendObjectToMQ(item, "obj-mq"); System.out.println("将对象消息写入消息队列"); }
4.4 测试消息读取 ConsumerMQ.java 是上面处理文本信息通用的那个
@Test public void testObjConsumser(){ ConsumerMQ c = new ConsumerMQ(); Message message = c.recieveMessageFromMq("obj-mq"); ObjectMessage objMsg = (ObjectMessage) message; try { Item item = (Item) objMsg.getObject(); System.out.println("读取对象消息:" + item.getId() + item.getTitle()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
5.监听器实现
监听指定的队列,持续获取消息
创建 MyListener.java ,实现 MessageListener,自定义一个监听器
public class MyListener implements MessageListener { @Override public void onMessage(Message message) { if(null != message) { ObjectMessage objMsg = (ObjectMessage) message; try { Item item = (Item) objMsg.getObject(); System.out.println("商品的id:"+item.getId()+",商品的名称:"+item.getTitle()+",商品的价格:"+item.getPrice()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
当Java程序结束时,监听器无法继续加载
解决方案:1.将监听器放在web程序中 2. 阻塞线程
嗯……这里先用方法二
修改 ConsumerMQ.java
当使用监听器时,不用调用receive()方法,而是加载监听器
// 5.创建消费者 consumer = session.createConsumer(dest); // 6.读取消息 // message = consumer.receive(); // 7.加载监听器 consumer.setMessageListener(new MyListener()); // 强制阻塞线程,让监听器持续加载 System.in.read();
测试
@Test public void testObjConsumser(){ ConsumerMQ c = new ConsumerMQ(); c.recieveMessageFromMq("obj-mq"); }
访问ActiveMQ管理界面,会看到 待读取消息一出现就会被读取。