生餐者:
package clc.active.listener; import org.apache.activemq.ActiveMQConnectionFactory; import org.testng.annotations.Test; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import java.util.Random; /** * ClassName: ObjectProducer<br/> * Description: <br/> * date: 2019/1/15 3:25 PM<br/> * * @author chengluchao * @since JDK 1.8 */ public class ObjectProducer { @Test public void sendMessage() { ConnectionFactory factory = null; Connection connection = null; Session session = null; Destination destination = null; MessageProducer producer = null; Message message = null; try { factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://2.2.2.4:61616"); connection = factory.createConnection(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); destination = session.createQueue("test-listener"); producer = session.createProducer(destination); connection.start(); Random r = new Random(); for (int i = 0; i < 100; i++) { Integer data = i; message = session.createObjectMessage(data); producer.send(message); } } catch (Exception e) { e.printStackTrace(); } finally { // 回收资源 //消息发送者 if (producer != null) { try { producer.close(); } catch (JMSException jmse) { jmse.printStackTrace(); } } //会话对象 if (session != null) { try { session.close(); } catch (JMSException jmse) { jmse.printStackTrace(); } } //连接对象 if (connection != null) { try { connection.close(); } catch (JMSException jmse) { jmse.printStackTrace(); } } } } }
消费者:
package clc.active.listener; import org.apache.activemq.ActiveMQConnectionFactory; import org.testng.annotations.Test; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import java.util.Random; /** * ClassName: ConsumerListener<br/> * Description: <br/> * date: 2019/1/15 3:25 PM<br/> * * @author chengluchao * @since JDK 1.8 */ public class ConsumerListener { @Test public void consumMessage() { ConnectionFactory factory = null; Connection connection = null; Session session = null; Destination destination = null; MessageConsumer consumer = null; try { factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://2.2.2.4:61616"); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//客户端确认 destination = session.createQueue("test-listener"); consumer = session.createConsumer(destination); //注册监听器,注册成功后,队列中的消息变化,会自动触发监听器代码 consumer.setMessageListener(new MessageListener() { /* 监听器一旦注册,永久有效 永久 - consumer线程不关闭 处理消息的方式:只要有消息未处理,自动调用onMessage方法,处理消息 监听器可以注册若干。注册多个监听器,相当于集群 ActiveMQ自动的循环调用多个监听器,处理队列中的消息,并实现处理 处理消息的方法,就是监听方法 */ @Override public void onMessage(Message message) { try { //acknowledge方法,就是确认方法,代表consumer已经收到消息,确认后,MQ可以删除对应的消息 message.acknowledge(); ObjectMessage om = (ObjectMessage) message; Object data = om.getObject(); System.out.println(data); } catch (JMSException e) { e.getErrorCode(); } } }); //阻塞当前代码,保证listener代码结束,如果代码结束了,监听器自动关闭 System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { // 回收资源 if (consumer != null) { try { consumer.close(); } catch (JMSException jmse) { jmse.printStackTrace(); } } //会话对象 if (session != null) { try { session.close(); } catch (JMSException jmse) { jmse.printStackTrace(); } } //连接对象 if (connection != null) { try { connection.close(); } catch (JMSException jmse) { jmse.printStackTrace(); } } } } }