1.四种消息队列:
1.1(Storm-)Kafka
1.2RabbitMQ安全性最高
1.3RocketMQ==>阿里
1.4ActiveMQ
2.为什么要使用消息队列?
主要解决系统之间的通信问题
2.1. 尽量消除系统耦合性
2. 2.异步消息传递(异步通信)
2.3. 流量削峰
3.JMS(Java Message Service)Java 消息服务
3.1. JMS是Java EE的规范之一,定义了访问消息中间件的接口;
3.2. JMS规范指出消息传递应该是异步的、非阻塞的;
4.JMS的核心API
4.1. ConnectionFactory:连接工厂,用于创建Connection
4.2. Connection:客户端和MQ服务器的一次连接
4.3. Session:一次会话
4.4. Destination:生产者生产消息的目的地,消费者消费消息的来源
Queue:只能消费一次
Topic:可以消费多次
4.5. MessageProducer:消息生产者,用于将消息发送消息队列
4.6. MessageConsumer:消费者
4.7. Message:消息
TextMessage
MapMessage
ObjectMessage
BytesMessage
StreamMessage
4.8. MessageListener
5.JMS消息类型
5.1. 点对点消息(P2P): 一条消息只能被一个消费者消费,生产者和消费者没有时间上的依赖性
5.2. 发布订阅 : 一条消息可以被多个消费者消费
生产者和消费者有时间上的依赖性(生产者在生产消息的时候,至少应该有一个消费者处于在线状态)
可以创建一个持久化的消费者订阅队列
6.什么是ActiveMQ
1. ActiveMQ是最受欢迎的、功能强大的开源消息和集成服务器
2. ActiveMQ 速度快,支持跨语言的客户端和协议,很容易进行企业集成,支持许多高 级特性,完全支持JMS1.1和J2EE1.4
7.ActiveMQ的特点
7.1. 支持多语言客户端和协议,如Java、C、C++、Ruby、Perl、Python、PHP
7.2. 支持许多高级特性,如消息分组、虚拟目的地、通配符、组合目的地;
7.3. 完全支持JMS1.1和J2EE1.4;
7.4. 可以很容易集成到Spring应用程序中;
7.5. 通过大部分J2EE服务器的测试,如TomEE、JBoss、WebLogic等;
7.6. 支持高效的JDBC持久化方式;
7.7. 集群的支持;
7.8. ......
8.上代码
8.1生产者和消费者(原生状态)
/** * 生产者 */ public class HelloProducer { public static void main(String[] args) throws JMSException { // 1. 创建ConnectionFactory(用户名、密码、连接地址) // 集群的情况:“failover:(tcp://192.168.1.100:61616,tcp://192.168.1.101:61616,tcp://192.168.1.102:61616)?Randomize=false” ConnectionFactory factory = new ActiveMQConnectionFactory(null, null, "tcp://localhost:61616"); //2. Connection connection = factory.createConnection(); connection.start(); //3.创建session //参数(是否开启事务,客户端自动签收消息) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4.创建Destination(目的地) //参数:队列名称,如果不存在则创建一个新的队列 Queue queue = session.createQueue("hello"); //5.创建生产者 MessageProducer producer = session.createProducer(queue); //6.创建消息,发送消息 for(int i= 0;i<10;i++){ TextMessage message = session.createTextMessage("这是第" + i + "条消息"); producer.send(message); } producer.close(); session.close(); connection.close(); System.out.println("发送完成"); } }
=============================================================================================
/**
* 消费者
*/
public class HelloConsumer {
public static void main(String[] args) throws JMSException, InterruptedException {
ConnectionFactory factory = new ActiveMQConnectionFactory(null,
null,
"tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("hello");
MessageConsumer consumer = session.createConsumer(queue);
while(true){
TextMessage msg = (TextMessage)consumer.receive();
if(msg!=null){
System.out.println(msg.getText().toString());
Thread.sleep(1000);
}
}
}
}
8.2生产者和消费者(手动签收消息)
/** * 生产者 */ public class SelectorProducer { public static void main(String[] args) throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory(null, null,"tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.start(); //客户端手动签收消息 Session session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); Queue queue = session.createQueue("selector"); MessageProducer producer = session.createProducer(queue); TextMessage msg = session.createTextMessage("地址2"); msg.setIntProperty("age",12); msg.setStringProperty("name","et"); TextMessage msg2 = session.createTextMessage("地址2"); msg2.setIntProperty("age",2); msg2.setStringProperty("name","et"); producer.send(msg); producer.send(msg2); producer.close(); session.close(); connection.close(); System.out.println("发送成功"); } }
========================================================================================================
/**
* 消费者
*/
public class SelectConsumer {
public static void main(String[] args) throws JMSException, InterruptedException {
ConnectionFactory factory = new ActiveMQConnectionFactory(null,
null,
"tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("selector");
MessageConsumer consumer = session.createConsumer(queue, "name = 'et' and age = 2");
consumer.setMessageListener(message -> {
TextMessage textMessage = (TextMessage)message;
try {
System.out.println(textMessage.getText());
//签收消息,通知队列删除消息
message.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
});
}
}
8.3发布和订阅状态(及持久化订阅)
/** * 发布 */ public class HelloPublisher { public static void main(String[] args) throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory(null, null, "tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("topic"); MessageProducer producer = session.createProducer(topic); MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("name","et2006"); mapMessage.setInt("id",111111); producer.send(mapMessage); producer.close(); session.close(); connection.close(); System.out.println("发送完成"); } }
========================================================================================
/**
* 订阅
*/
public class HelloSubscriber {
public static void main(String[] args) throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory(null,
null,
"tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("topic");
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new HelloListener());
}
}
class HelloListener implements MessageListener{
@Override
public void onMessage(Message message) {
if(message instanceof MapMessage){
MapMessage mapMessage = (MapMessage)message;
try {
int id = mapMessage.getInt("id");
String name = mapMessage.getString("name");
System.out.println(id + "-" + name);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
==========================================================================================
/**
* 持久化订阅者
*/
public class DurableSubscriber {
public static void main(String[] args) throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory(null,
null,
"tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.setClientID("zs");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("topic");
MessageConsumer consumer = session.createDurableSubscriber(topic,"zs");
consumer.setMessageListener(message -> {
if(message instanceof MapMessage){
MapMessage mapMessage = (MapMessage)message;
try {
int id = mapMessage.getInt("id");
String name = mapMessage.getString("name");
System.out.println(id + "-" + name);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
9.activemq整合spring
D:五月五月课件workActiveMQactivemq-springmvc