ActiveMQ 开发包下载及运行环境搭建
主页:http://activemq.apache.org/
目前最新版本:5.11.1
开发包及源码下载地址:http://activemq.apache.org/activemq-5111-release.html
ActiveMQ 服务启动地址:http://127.0.0.1:8161/admin/ 用户名/密码admin/admin
ActiveMQ下载完毕后,点击以下文件(64位和32位分别点击对应文件夹下的该文件):
ActiveMQ 服务器启动完毕后,打开http://127.0.0.1:8161/admin/ 用户名/密码admin/admin,将会见到如下界面:
ActiveMq 点对点消息实现
第一种方式:直接Receive 方式
1.Session.AUTO_ACKNOWLEDGE。当客户成功的从receive
方法返回的时候,或者从MessageListener.onMessage
方法成功返回的时候,会话自动确认客户收到的消息。
2.Session.CLIENT_ACKNOWLEDGE。客户通过消息的acknowledge 方法确认消息。需要注意的是,在这种模
式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一
个消息消费者消费了10 个消息,然后确认第5 个消息,那么所有10 个消息都被确认。
3.Session.DUPS_ACKNOWLEDGE。该选择只是会话迟钝第确认消息的提交。如果JMS provider 失败,那么可
能会导致一些重复的消息。如果是重复的消息,那么JMS provider 必须把消息头的JMSRedelivered 字段设置
为true。
方法成功返回的时候,会话自动确认客户收到的消息。
2.Session.CLIENT_ACKNOWLEDGE。客户通过消息的acknowledge 方法确认消息。需要注意的是,在这种模
式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一
个消息消费者消费了10 个消息,然后确认第5 个消息,那么所有10 个消息都被确认。
3.Session.DUPS_ACKNOWLEDGE。该选择只是会话迟钝第确认消息的提交。如果JMS provider 失败,那么可
能会导致一些重复的消息。如果是重复的消息,那么JMS provider 必须把消息头的JMSRedelivered 字段设置
为true。
首先我们创建Java工程,导入activemq-all-5.11.1.jar,如下
然后我们建立一个生产者类,负责发送消息,并创建一个名为MyQueue1的队列
package cn.com.evan.Jms.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JmsProducer { private static String USERNAME = ActiveMQConnection.DEFAULT_USER; private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL; private static Integer SENDNUM = 10; public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session; Destination destination; MessageProducer messageProducer; connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("MyQueue1");// Create message queue messageProducer = session.createProducer(destination);// Create producer sendMessage(session,messageProducer); session.commit(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ if(connection!=null){ try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException{ for(int i=0;i<JmsProducer.SENDNUM;i++){ TextMessage message = session.createTextMessage("ActiveMQ"+i); messageProducer.send(message); System.out.println("发送消息:"+message.getText()); } } }
运行程序,可以看到控制台已经发送了10条信息
我们打开activeMq控制台http://127.0.0.1:8161/admin/,查看消息发送的情况,可以看到MyQueue1里已经存放着10条消息,但还没有被消费
接着我们创建一个消费者,负责从消息队列MyQueue1里接受消息,这里我们用一个死循环去不断监听消息
package cn.com.evan.Jms.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JmsComsumer { private static String USERNAME = ActiveMQConnection.DEFAULT_USER; private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL; private static Integer SENDNUM = 10; public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session; Destination destination; MessageConsumer messageConsumer; connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("MyQueue1");// Create message // queue messageConsumer = session.createConsumer(destination);// Create // producer while (true) { TextMessage message = (TextMessage) messageConsumer .receive(100000); if (message != null) { System.out.println("接收信息:" + message.getText()); } else break; } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }运行程序可以看到消费者已经接受到10条消息
我们再查看activeMq控制台,可以看到消息已经被消费了10条
第二种方式:使用监听器去监听消息
首先我们创建一个监听器,实现MessageListener 借口
package cn.com.evan.Jms.activemq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class MyMessageListener implements MessageListener{ @Override public void onMessage(Message msg) { try { System.out.println("接受消息:"+((TextMessage)msg).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
我们把上面的JmsConsumer 修改下,加入我们自定义的监听器
package cn.com.evan.Jms.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JmsComsumer { private static String USERNAME = ActiveMQConnection.DEFAULT_USER; private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL; private static Integer SENDNUM = 10; public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session; Destination destination; MessageConsumer messageConsumer; connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("MyQueue1");// Create message // queue messageConsumer = session.createConsumer(destination);// Create // producer messageConsumer.setMessageListener(new MyMessageListener());// set listener which we created before } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }我们先运行JmsConsumer,让它监听MyQueue1队列里的消息,再运行JmsProducer
此时,我们在控制台可以看到消费者也接受到10条信息
打开activeMq控制台,可以看到消费者跟上面一样同样也消费了10条消息
以上两种消息处理方式介绍完毕,下一章会介绍一个发布者对多个消费者的发布订阅模式实例