说明:
- 使用的版本为apache-activemq-5.15.3
简述
下载
ActiveMQ官网下载地址:http://activemq.apache.org/download.html
ActiveMQ 提供了Windows 和Linux、Unix 等几个版本,楼主这里选择了Linux 版本下进行开发。
解压后目录:
目录介绍:
bin存放的是脚本文件
conf存放的是基本配置文件
data存放的是日志文件
docs存放的是说明文档
examples存放的是简单的实例
lib存放的是activemq所需jar包
webapps用于存放项目的目录
启动ActiveMQ:【linux】
终止ActiveMQ:【linux】
ActiveMQ 在linux 下的终止命令是 ./activemq stop
安装,启动
Queue测试
消息生产者--JMSProducer.java
package activeMqProject.amp; import javax.jms.*; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认的连接用户名 --> null private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认的连接密码 --> null private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认的连接地址 -->failover://tcp://localhost:61616 //private static final int SENDNUM = 2;//发送的消息数量 public static void main(String[] args) { ConnectionFactory connectionFactory = null;//连接工厂 Connection connection = null;//连接 Session session = null;//会话,接受或者发送消息的线程 Destination destination = null;//消息的目的地 MessageProducer messageProducer = null;//消息的生产者 //实例化连接工厂(指定连接用户名|密码|连接地址) connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKERURL); try { connection = connectionFactory.createConnection();//通过连接工厂获取连接 connection.start();//启动连接 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建session destination = session.createQueue("TestQueue");//创建消息队列 messageProducer = session.createProducer(destination);//创建消息生产者 sendMessage(session, messageProducer);//发送消息 session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } //发送消息 private static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException { for (int i = 0; i < 2; i++) { //创建一条文本消息 TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i); System.out.println("发送消息:Activemq 发送消息" + i); //通过消息生产者发出消息 messageProducer.send(message); } /*try { //创建消息Map<key,value> MapMessage message = session.createMapMessage(); message.setString("userName", "syf"); message.setInt("age", 30); message.setDouble("salary", 1000); message.setBoolean("isGirl", true); System.out.println("Sending:" + ((ActiveMQMapMessage)message).getContentMap()); //发送消息 messageProducer.send(message); } catch (JMSException e) { e.printStackTrace(); }*/ } }
消息消费者--JMSComsumer.java
package activeMqProject.amp; import java.io.IOException; import javax.jms.*; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSComsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) throws JMSException, IOException { ConnectionFactory connectionFactory; Connection connection; Session session; connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("TestQueue"); MessageConsumer consumer = session.createConsumer(queue); while (true) { TextMessage textMessage = (TextMessage) consumer.receive(100000); if(textMessage != null){ System.out.println("收到的消息:" + textMessage.getText()); }else { break; } } //8、程序等待接收用户消息 System.in.read(); //9、关闭资源 consumer.close(); session.close(); connection.close(); } }
Topic测试
消息生产者--JmsTopicProducer.java
package activeMqProject.amp; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JmsTopicProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认的连接用户名 --> null private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认的连接密码 --> null private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认的连接地址 -->failover://tcp://localhost:61616 public static void main(String[] args) throws JMSException { //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); //ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(JmsTopicProducer.USERNAME, JmsTopicProducer.PASSWORD, JmsTopicProducer.BROKERURL); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Topic topic = session.createTopic("test-topic"); //6、使用会话对象创建生产者对象 MessageProducer producer = session.createProducer(topic); //7、使用会话对象创建一个消息对象 TextMessage textMessage = session.createTextMessage("hello!test-topic"); //8、发送消息 producer.send(textMessage); //9、关闭资源 producer.close(); session.close(); connection.close(); } }
消息消费者--JmsTopicComsumer.java
package activeMqProject.amp; import java.io.IOException; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JmsTopicComsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认的连接用户名 --> null private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认的连接密码 --> null private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认的连接地址 -->failover://tcp://localhost:61616 public static void main(String[] args) throws JMSException, IOException { //1、创建工厂连接对象,需要制定ip和端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); //ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(JmsTopicComsumer.USERNAME, JmsTopicComsumer.PASSWORD, JmsTopicComsumer.BROKERURL); //2、使用连接工厂创建一个连接对象 Connection connection = connectionFactory.createConnection(); //3、开启连接 connection.start(); //4、使用连接对象创建会话(session)对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) Topic topic = session.createTopic("test-topic"); //6、使用会话对象创建生产者对象 MessageConsumer consumer = session.createConsumer(topic); //7、向consumer对象中设置一个messageListener对象,用来接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // TODO Auto-generated method stub if(message instanceof TextMessage){ TextMessage textMessage = (TextMessage)message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); //8、程序等待接收用户消息 System.in.read(); //9、关闭资源 consumer.close(); session.close(); connection.close(); } }