1、JMS编码总体架构
2、队列和主题
在点对点的消息传递域中,目的地被称为队列(queue)
在发布订阅消息传递域中,目的地被称为主题(topic)
3、队列模式demo
1)pom文件中引入依赖
<dependencies> <!-- activemq 所需要的jar 包--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.0</version> </dependency> <!-- activemq 和 spring 整合的基础包 --> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.16</version> </dependency> </dependencies>
2)生产者发送消息
/** * @author houChen * @date 2020/7/12 16:30 * @Description: */ public class JmsProduce { public static final String ACTIVEMQ_URL = "tcp://192.168.2.180:61616"; public static final String QUEUE_NAME ="queue01"; public static void main(String[] args) throws JMSException { //1、创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2、通过连接工程获取connection Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3、创建会话 //两个参数 1:事务 2:签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4、创建目的地 Queue queue = session.createQueue(QUEUE_NAME); //5、创建消息的生产者 MessageProducer messageProducer = session.createProducer(queue); //6、使用消息生产者生产3条消息发送到MQ的队列里面 for(int i=1;i<=3;i++){ //7、创建消息 TextMessage textMessage = session.createTextMessage("msg---" + i); //8、通过messageProducer发送给mq messageProducer.send(textMessage); } //9、释放资源 messageProducer.close(); session.close(); connection.close(); System.out.println("消息发送到MQ成功!!!"); } }
3) 消费者消费消息
/**
* @author houChen
* @date 2020/7/12 17:58
* @Description:
*/
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.2.180:61616";
public static final String QUEUE_NAME ="queue01";
public static void main(String[] args) throws JMSException {
//1、创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2、通过连接工程获取connection
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建会话
//两个参数 1:事务 2:签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、创建目的地
Queue queue = session.createQueue(QUEUE_NAME);
//5、创建消息的消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
/*
同步阻塞方式 receive()
订阅者或接受这调用MessageConsumer的receive()方法来接受消息,receive()在收到消息之前将一直阻塞
*/
while (true){
TextMessage textMessage = (TextMessage)messageConsumer.receive();
if(null!=textMessage){
System.out.println("消费者接收到消息----"+textMessage.getText());
}else{
break;
}
}
//释放资源
messageConsumer.close();
session.close();
connection.close();
}
}
补充:使用监听器的方式来获取消息
public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://192.168.2.180:61616"; public static final String QUEUE_NAME ="queue01"; public static void main(String[] args) throws JMSException, IOException { //1、创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2、通过连接工程获取connection Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3、创建会话 //两个参数 1:事务 2:签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4、创建目的地 Queue queue = session.createQueue(QUEUE_NAME); //5、创建消息的消费者 MessageConsumer messageConsumer = session.createConsumer(queue); // 通过监听的方式来接收消息 messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(null!=message&&message instanceof TextMessage){ TextMessage textMessage=(TextMessage)message; try { System.out.println("消费者接收到消息:"+ textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); // System.in.read(); // 等待 监听器进行监听 messageConsumer.close(); session.close(); connection.close(); System.out.println("消息发送到MQ成功!!!"); } }
4、消费者三大消费情况