Java Message Service(JMS)
JMS支持两种消息发送和接收模型。
一种称为P2P(Ponit to Point)模型,即采用点对点的方式发送消息。P2P模型是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输称为可能,P2P模型在点对点的情况下进行消息传递时采用。
一种称为Pub/Sub(Publish/Subscribe,即发布-订阅)模型,发布-订阅模型定义了如何向一个内容节点发布和订阅消息,这个内容节点称为topic(主题)。主题可以认为是消息传递的中介,消息发布这将消息发布到某个主题,而消息订阅者则从主题订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布-订阅模型在消息的一对多广播时采用。
概念:
Provider/MessageProvider:生产者
Consumer/MessageConsumer:消费者
PTP:Point To Point,点对点通信消息模型 Queue:队列,目标类型之一,和PTP结合
Pub/Sub:Publish/Subscribe,发布订阅消息模型 Topic:主题,目标类型之一,和Pub/Sub结合
ConnectionFactory:连接工厂,JMS用它创建连接
Connnection:JMS Client到JMS Provider的连接
Destination:消息目的地,由Session创建,目的地是队列或者主题。
Session:会话,由Connection创建,实质上就是发送、接受消息的一个线程,因此生产者、消费者都是Session创建的
61616 提供JMS服务端口 8261 控制台端口
消息类型 消息风格
StreamMessage Java原始值的数据流
MapMessage 一套名称-键值对
TextMessage 一个字符串对象
ObjectMessage 一个序列号的Java对象
BytesMessage 一个未解释字节的数据流
签收机制
签收就是消费者接受到消息后,需要告诉消息服务器,我收到消息了。当消息服务器收到回执后,本条消息将失效。因此签收将对PTP模式产生很大影响。如果消费者收到消息后,并不签收,那么本条消息继续有效,很可能会被其他消费者消费掉!
Session.AUTO_ACKNOWLEDGE:表示在消费者receive消息的时候自动的签收
Session.CLIENT_ACKNOWLEDGE:表示消费者receive消息后必须手动的调用acknowledge()方法进行签收
Session.DUPS_OK_ACKNOWLEDGE:签不签收无所谓了,只要消费者能够容忍重复的消息接受,当然这样会降低Session的开销
消费者客户端成功接收一条消息一般包括三个阶段:
1、消费者接收消息,也即从MessageConsumer的receive(timeout)方法返回,timeout到达之后,自动断开连接
2、消费者处理消息
3、消息被签收
其中,第三阶段的签收可以有ActiveMQ发起,也可以由消费者客户端发起,取决于Session是否开启事务以及签收模式的设置。
** 在带事务的Session中,消费者客户端事务提交之时,消息自动完成签收。**
** 在不带事务的Session中,消息何时以及如何被签收取决于Session的签收模式设置**
在创建Session对象中,传入签收机制。
// Session: 一个发送或接收消息的线程 false:代表不带事务的session AUTO_ACKNOWLEDGE:代表自动签收
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
事务:
transaction
false 事务会主动提交,生产者发送消息后,会话会主动提交。
true 事务需要手动提交,在生产者发送完send消息后,会话关闭前,要进行 session.commit() 操作, 异常情况可以fallback()。
对于消费者而言,设置true时,如果没有提交,队列会认为消息没有消费,即使消费者已经取出消息了,这会造成消息重复消费。所有消费者事务要设置成false。
总结:事务偏向生产者,签收偏向消费者。
消费者的事务要关闭。
主题topic 一对多
生产者和消费者之间有时间相关性,订阅之后,消费者才能从订阅时间点后消费消息,生产者之前发布的消息不能消费。
生产者生产时,topic不保存消息,消息是无状态的,如果没有消费者消费,消息就是废消息,一般先启动消费者,在启动生产者。
JMS规范允许客户创建持久化订阅,在一定程度上放松了时间相关性,允许消费者消费生产者发布的历史消息,类似微信公众号。
比较 | Topic | Queue |
---|---|---|
工作模式 | 订阅-发布模式; 如果当前没有订阅者,消息将会别丢弃; 如果有多个订阅者,那么所有订阅者都会受到消息。 | 负载均衡模式。如果当前没有消费者,消息也不会丢弃。如果有多个消费者,那么一条消息也只会发送给一个消费者,并且要求消费者ack信息。 |
状态 | 无状态 | Queue数据默认会在MQ服务器上以文件的形式保存,ActiveMQ一般会保存在$AMQ_HOME/data/kr-store/data下面,也可以配置成DB存储。 |
传递完整性 | 如果没有订阅者,消息会被丢弃。 | 消息不会被丢弃。 |
处理效率 | 由于消息要按照订阅者的数量进行复制,所有处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异。 | 由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会有明显降低,当然不同消息协议的具体性能也是有差异的。 |
JMS消息
消息头:所有的消息都支持同样的一组消息头字段。消息头字段包含了客户端和提供者用于标识和路由消息的值。
消息属性:除了标准的消息头字段,还可以为消息提供多个可选的消息头字段。应用相关属性,标准属性(JMS提供的),提供者相关属性。
消息体:JMS定义了几种类型的消息体,这几种消息体覆盖了当前使用中的大部分消息风格。
消息头:
JMSDestination
JMSDeliveryMode:持久化模式和非持久化模式 DeliveryMode.NON_PERSISTENT DeliveryMode.PERSISTENT
JMSExpiration 默认是0,永不过期,
JMSPriority JMS定义从0级到9级的十级优先级。此外,客户端应优先考虑0-4为正常优先级, 5-9为高优先级。JMS不要求提供者严格实现消息的优先级顺序;但是,它应该尽最大努力优先于正常消息投递加急消息。
JMSMessageID JMSMessageID的值是一个字符串,在历史库作为标识消息的唯一键值。
消息属性:
以属性名,属性值成对存在的K-V键值对。
setStringPropertt setString 方法设置属性。
可以用来标识一个消息,识别消息。
消息中间件的传输协议:
ActiveMQ
Transmission Control Protocol(TCP) 默认 数据传输使用openwire
NEW I/O /API Protocol 性能比TCP更好
消息中间的存储和可持久化:
可持久化是将ActiveMQ的数据采用某种方式保存到磁盘上,数据库上,文件上等,这些是副本数据,与ActiveMQ在物理上相隔,当ActiveMQ宕机之后,数据也不会丢失,保证了高可用性,稳定性。
消息中间的高可用
消息中间件的高级特性:
异步投递 延时投递 定时投递 死信队列
在Java代码中使用Activemq
ActiveMQConnectionFactory
CachingConnectionFactory
JmsTemplate SimpleMessageListenerContainer MessageCreator
ActiveMQQueue
ActiveMQTopic
Destination
MessageListener
spring集合Activemq
JmsTemplate 模板对象需要两个引用,第一个是连接工厂,第二是 消息目的地
DefaultMessageListnerContainer 客户端接收者
springboot整合
配置文件节点信息
spring.activemq //配置连接
spring.jms.pub-sub-domain: false队列 true主题
JmsMessagingTemplate模板对象 ,用作消息生产者,生产消息
@EnableJms 在配置类上开启消息服务
客户端的 某个方法,用于接收消息服务的方法
@JmsListner(destination="") 该注解要标记在处理消息的方法上
public void receive(Message message){ code }这个类似spring中的设置监听器。