1.安装ActiveMQ服务器(略)
2.启动ActiveMQ,浏览器访问8161端口,默认账号admin/admin
3. 生产者代码
package test001;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Created by zhengqinfeng
* @Description :
* @Date : created in 23:44 2018/5/14
*/
public class ProducerMQ {
public static void main(String[] args) throws JMSException {
//创建MQ工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
//创建连接
Connection connection = factory.createConnection();
//启动连接
connection.start();
//创建会话工厂, false:表示不以事务方式进行提交; Session.AUTO_ACKNOWLEDGE 表示自动签收
// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//Session.CLIENT_ACKNOWLEDGE:手动签收
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
//创建队列
Queue testMQ = session.createQueue("testMQ1");
//创建生产者
MessageProducer producer = session.createProducer(testMQ);
//消息不持久化
// producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//持久化消息
//producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i <= 10; i++) {
System.out.println("###########我是生产者:"+i+"###############");
sendMsg(session, producer, "我是生产者:" + i);
}
System.out.println("###########发送消息完毕###############");
}
private static void sendMsg(Session session,MessageProducer producer,String i) throws JMSException {
TextMessage textMessage = session.createTextMessage("hello activeMQ " + i);
producer.send(textMessage);
}
}
4.消费者代码
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Created by zhengqinfeng
* @Description : 消费者
* @Date : created in 12:47 2018/5/15
*/
public class ConsumerMQ {
public static void main(String[] args) throws JMSException {
System.out.println("消费者1");
//创建MQ工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
//创建连接
Connection connection = factory.createConnection();
//启动连接
connection.start();
//创建会话工厂 AUTO_ACKNOWLEDGE:自动签收
// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 手动签收
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
//创建队列
Queue testMQ = session.createQueue("testMQ1");
//创建生产者
MessageConsumer consumer = session.createConsumer(testMQ);
//第一种消费消息的方式
// while (true) {
// //获取消息
// TextMessage message = (TextMessage) consumer.receive();
// if (message != null) {
// String text = message.getText();
// System.out.println("消费者获取消息,text:" + text);
// //手动签收,如果没有手动签收,消息还是会存在于队列中的(当然这是在Session.CLIENT_ACKNOWLEDGE模式下)
// message.acknowledge();
// } else {
// break;
// }
// }
// System.out.println("消费者获取消息完毕>>>>>>>>>>>>>>>>>>>..");
//第二种消费消息的方式
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage m = (TextMessage) message;
try {
System.out.println("消费者获取消息:"+m.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
启动消费者代码,然后修改System.out.println("消费者1")为System.out.println("消费者2"),再次启动。模拟测试两个消费者
启动生产者
再查看消费者1:
查看消费者2:
结论:消费1和消费者均摊消费testMQ1这个队列中的消息。