消息中间件
在说activemq之前,首先要说下‘中间件’。百度百科对于中间件的理解是:
看上去很不好理解,那么下面我用我的理解简单解释下什么是中间件:
就拿生活中网上购物举例子,从快递点--送到买家,一个快递员需要一次送很多家,如果每家都送到门口,那么无疑加重了快递员的工作,效率也不高,如果快递员将快递都送到‘蜂巢’点,谁需要就去‘蜂巢’中去取,整体的效率就会有效的提高,那么这个蜂巢就是‘中间件’。在这个场景下,有了‘蜂巢’代收有什么好处呢?比如一个快递员去一家送货,如果这家没有人就会导致此次送货失败,如果有了‘蜂巢’,就不会出现这样的问题了。
那么在程序中,在客户端与服务器进行通讯时,客户端调用服务端接口后,必须等待服务端完成处理后返回结果给客户端才能继续执行,这种情况属于同步调用方式。如果服务器端发生网络延迟、不可达的情况,可能客户端也会受到影响。那么使用了消息中间件,客户端只需要将消息发给就中间件就可以了,在合适的时候中间件会自动将消息传给服务器进行处理。
消息队列
消息队列是消息中间件的一种实现方式,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
应用解耦
比如拿网上购物系统来说,现在有两个模块,分别是购物系统和库存系统。传统的逻辑是,订单系统将下订单的消息传给库存系统,库存系统接受到消息去处理,那么如果库存系统出现了问题,就会导致整个订货流程失败。
如果使用了消息中间件,订单系统将订单的消息传给中间件就可以了,不用再去管中间件和库存系统之间是如何交互的了。这样就实现了购物系统和库存系统的解耦。
那么这里有些人可能会有疑问,如果中间件也出现了问题呢,岂不是也会导致整个流程失败。这里涉及到了中间件的持久化问题,下面会详细的讲解,简单的说就是中间件如果崩溃了,它的持久化设置,仍会将消息保存下来,等到中间件重新启动,再自动或手动处理即可。
异步消息
比如拿网上购车票来说,购票成功后,一般都会向购买人发送短信和邮件提醒购票成功。传统的逻辑是,购票成功后,系统先执行发送短信的操作,之后再执行发送邮件的操作,这样同步的执行速度会很慢。
如果使用了消息中间件,购票成功后,将发送短信和发送邮件的消息内容传给消息中间件,之后发送短信和发送邮件的模块自己去中间件中取消息,之后执行逻辑,这样异步的执行速度无疑会加快系统的速度。
流量削峰
比如拿商品秒杀活动举例,短时间内上万,百万次的请求很有可能导致应用挂掉,一般处理办法是在应用前端加上消息队列来控制活动人数。
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面;秒杀业务根据消息队列中的请求信息,再做后续处理。
常见的消息中间件对比
目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。
JMS的定义
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
简单的说:JMS是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。(类似于JDBC)
JMS的角色组成
JMS提供者:实现JMS规范的消息中间件服务器 (存放消息容器)
JMS客户端:发送或接收消息的应用程序
JMS生产者/发布者:创建并发送消息的客户端(向消息容器存放消息)
JMS消费者/订阅者:接收并处理消息的客户端
JMS消息:应用程序之间传递的数据内容
JMS队列:一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同,消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读,该消息将被从队列中移走。
JMS主题:一种支持发送消息给多个订阅者的机制。
JMS消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式 点对点与发布订阅模式。
JMS消息
JMS Message消息由三部分组成:1. 消息头。2. 消息体。3. 消息属性。
消息头
JMS消息头预定义了若干字段用于客户端与JMS提供者之间识别和发送消息,预编译头如下:下面列出一些重要的消息头
1. JMSDestination:消息发送的Destination,在发送过程中由提供者设置。
2. JMSMessageID:唯一标识提供者发送的每一条消息。这个字段是在发送过程中由提供者设置的,客户机只能在消息发送后才能确定消息的JMSMessageID。
3. JMSDeliveryMode:消息持久化。包含值DeliveryMode.PERSISTENT或者DeliveryMode.NON_PERSISTENT。
4. JMSTimestamp:提供者发送消息的时间,由提供者在发送过程中设置。
5. JMSExpiration:消息失效的时间,值0表明消息不会过期,默认值为0。
6. JMSPriority:消息的优先级,由提供者在发送过程中设置。优先级0的优先级最低,优先级9的优先级最高。0-4为普通消息,5-9为加急消息。ActiveMQ不保证优先级高就一定先发送,只保证了加急消息必须先于普通消息发送,默认值为4。
7. JMSCorrelationID:通常用来链接响应消息与请求消息,由发送消息的JMS程序设置。
8. JMSReplyTo:请求程序用它来指出回复消息应发送的地方,由发送消息的JMS程序设置。
9. JMSType:JMS程序用它来指出消息的类型。
10.JMSRedelivered:消息的重发标志,false,代表该消息是第一次发生,true,代表该消息为重发消息。
不过需要注意的是,在传送消息时,消息头的值由JMS提供者来设置,因此开发者使用以上setJMSXXXX()方法分配的值就被忽略了,只有以下几个值是可以由开发者设置的:JMSCorrelationID,JMSReplyTo,JMSType。
消息体
在消息体中,JMS API定义了五种类型的消息格式,让我们可以以不同的形式发送和接受消息,并提供了对已有消息格式的兼容。不同的消息类型如下:
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
1. TextMessage:一个字符串对象
2. MapMessage:一套名称-值对
3. ObjectMessage:一个序列化的java对象
4. BytesMessage:一个字节的数据流
5. StreamMessage:java原始值的数据流
消息属性
我们可以给消息设置自定义属性,这些属性主要是提供给应用程序的。对于实现消息过滤功能,消息属性非常有用,JMS API定义了一些标准属性,JMS服务提供者可以选择性的提供部分标准属性。
1 message.setStringProperty("Property",Property);//自定义属性
消息模式
有两种模型:点对点/发布订阅模式,区别是一对一和一对多。
点对点(P2P)
即生产者和消费者之间的消息往来。
点对点模型的特点:
1. 每个消息只有一个消费者(即一旦被消费,消息就不再在消息队列中)
2. 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。
3. 接收者在成功接收消息之后需要向队列应答成功。
发布订阅(Pub/Sub)
包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber),多个发布者将消息发送到topic,系统将这些消息投递到订阅此topic的订阅者。
发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到这个消息的拷贝。
发布订阅模型的特点:
1. 每个消息可以有多个消费者。
2. 发布者和订阅者之间有时间上的依赖性。
3. 订阅者必须保持运行的状态,才能接受发布的消息。不过为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
JMS消息正文格式
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
1. StreamMessage:java原始值得数据流
2. MapMessage:一套名称-值对
3. TextMessage:一个字符串对象(常用)
4. ObjectMessage:一个序列化得java对象
5. BytesMessage:一个字节得数据流
JMS编程API
1. ConnectionFactory:创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。
2. Destination:意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。所以,Destination实际上就是两种类型的对象:Queue,Topic。
3. Connection:表示在客户端和JMS系统之间建立的连接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session
4. Session:Session是我们对消息进行操作的接口,可以通过session创建生产者,消费者,消息等。Session提供了事务的功能,如果需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。
5. Producter:消息生产者。由session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。
6. Consumer:消息消费者。消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。
7. MessageListener:消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MesasgeListener。
ActiveMQ的使用方法(queue和topic)
首先需要在pom.xml中引入依赖
1 <dependency> 2 <groupId>org.apache.activemq</groupId> 3 <artifactId>activemq-all</artifactId> 4 </dependency>
quene的发送代码
1 public void testMQProducerQueue() throws Exception{ 2 3 //1、创建工厂连接对象,需要制定ip和端口号 4 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); 5 //2、使用连接工厂创建一个连接对象 6 Connection connection = connectionFactory.createConnection(); 7 //3、开启连接 8 connection.start(); 9 //4、使用连接对象创建会话(session)对象 10 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 11 //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) 12 Queue queue = session.createQueue("test-queue"); 13 //6、使用会话对象创建生产者对象 14 MessageProducer producer = session.createProducer(queue); 15 //7、使用会话对象创建一个消息对象 16 TextMessage textMessage = session.createTextMessage("hello!test-queue"); 17 //8、发送消息 18 producer.send(textMessage); 19 //9、关闭资源 20 producer.close(); 21 session.close(); 22 connection.close(); 23 24 }
queue的接收代码(监听器模式)
注意:在监听器的模式下千万不要关闭连接,一旦关闭,消息无法接收
1 public void TestMQConsumerQueue() throws Exception{ 2 3 //1、创建工厂连接对象,需要指定ip和端口号 4 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); 5 //2、使用连接工厂创建一个连接对象 6 Connection connection = connectionFactory.createConnection(); 7 //3、打开连接 8 connection.start(); 9 //4、使用连接对象创建会话(session)对象 10 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 11 //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) 12 Queue queue = session.createQueue("test-queue"); 13 //6、创建消息的消费者 14 MessageConsumer consumer = session.createConsumer(queue); 15 //7、设置消息监听器来接收消息 16 consumer.setMessageListener(new MessageListener() { 17 //处理消息 18 @Override 19 public void onMessage(Message message) { 20 if(message instanceof TextMessage){ 21 TextMessage textMessage = (TextMessage)message; 22 try { 23 System.out.println(textMessage.getText()); 24 } catch (JMSException e) { 25 e.printStackTrace(); 26 } 27 } 28 } 29 }); 30 //8、程序等待接收用户消息 31 System.in.read(); 32 //9、关闭资源 33 consumer.close(); 34 session.close(); 35 connection.close(); 36 }
queue的接收代码(receive方法)
1 public void TestMQConsumerQueue() throws Exception{ 2 3 //1、创建工厂连接对象,需要指定ip和端口号 4 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); 5 //2、使用连接工厂创建一个连接对象 6 Connection connection = connectionFactory.createConnection(); 7 //3、打开连接 8 connection.start(); 9 //4、使用连接对象创建会话(session)对象 10 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 11 //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) 12 Queue queue = session.createQueue("test-queue"); 13 //6、创建消息的消费者 14 MessageConsumer consumer = session.createConsumer(queue); 15 //7、接收消息 16 while(true){ 17 Message message = consumer.receive(); 18 //如果已经没有消息了,结束了 19 if(message==null){ 20 break; 21 } 22 //如果还有消息,判断什么类型的消息 23 if(message instanceof TextMessage){ 24 TextMessage textMessage = (TextMessage)message; 25 System.out.println("接收的消息"+textMessage.getText()); 26 } 27 } 28 //8、程序等待接收用户消息 29 System.in.read(); 30 //9、关闭资源 31 consumer.close(); 32 session.close(); 33 connection.close(); 34 }
topic的发送代码
1 public void TestTopicProducer() throws Exception{ 2 3 //1、创建工厂连接对象,需要制定ip和端口号 4 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); 5 //2、使用连接工厂创建一个连接对象 6 Connection connection = connectionFactory.createConnection(); 7 //3、开启连接 8 connection.start(); 9 //4、使用连接对象创建会话(session)对象 10 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 11 //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) 12 Topic topic = session.createTopic("test-topic"); 13 //6、使用会话对象创建生产者对象 14 MessageProducer producer = session.createProducer(topic); 15 //7、使用会话对象创建一个消息对象 16 TextMessage textMessage = session.createTextMessage("hello!test-topic"); 17 //8、发送消息 18 producer.send(textMessage); 19 //9、关闭资源 20 producer.close(); 21 session.close(); 22 connection.close(); 23 }
topic的接收代码
1 public void TestTopicConsumer() throws Exception{ 2 3 //1、创建工厂连接对象,需要制定ip和端口号 4 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616"); 5 //2、使用连接工厂创建一个连接对象 6 Connection connection = connectionFactory.createConnection(); 7 //3、开启连接 8 connection.start(); 9 //4、使用连接对象创建会话(session)对象 10 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 11 //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多) 12 Topic topic = session.createTopic("test-topic"); 13 //6、使用会话对象创建生产者对象 14 MessageConsumer consumer = session.createConsumer(topic); 15 //7、向consumer对象中设置一个messageListener对象,用来接收消息 16 consumer.setMessageListener(new MessageListener() { 17 @Override 18 public void onMessage(Message message) { 19 // TODO Auto-generated method stub 20 if(message instanceof TextMessage){ 21 TextMessage textMessage = (TextMessage)message; 22 try { 23 System.out.println(textMessage.getText()); 24 } catch (JMSException e) { 25 // TODO Auto-generated catch block 26 e.printStackTrace(); 27 } 28 } 29 } 30 }); 31 //8、程序等待接收用户消息 32 System.in.read(); 33 //9、关闭资源 34 consumer.close(); 35 session.close(); 36 connection.close(); 37 }
参考:
https://www.cnblogs.com/cxyyh/p/10700437.html
https://www.cnblogs.com/Soy-technology/p/11546530.html
https://blog.csdn.net/qq_33404395/article/details/80590113
持续更新!!!!